日常工作中需要大量、频繁地使用ssh到服务器查看、拉取相关的信息或者对服务器进行变更。目前公司大量使用的shell,但是随着逻辑的复杂化、脚本管理的精细化,shell已经不满足日常需求,于是我尝试整合工作中的需求,制作适合的工具。 由于管理制度的缺陷,我以工作流程为核心思考适合自己的运维方式,提升工作效率,把时间留给更有价值的事情。 完整代码在最后,请大家参考。
- 生产:4000+物理服务器,近 3000 台虚拟机。
- 开发环境:python3.6、redhat7.9,除了paramiko为第三方模块需要自己安装,其他的直接import即可。
- 配置变更:例如服务器上线时需要批量校正系统分区容量、及挂载数据盘。
- 配置信息查询过滤:例如过滤防火墙规则、过滤网卡配置。
- 存活检测:设置定时任务,定时轮询服务器的 ssh 状态是否正常。
- 文件传输:多个 ip 同时传输目录/文件。
- 批量操作前应当准备回退方案。
- 批量操作前作前先确定检查目标服务器相关的配置的一致性。
- 批量操作时应当把重复执行会影响系统的操作和不影响的分开执行。
- 批量操作后应当再次验证操作结果是否符合预期。
- 关键方法一定要记录传入的参数以及执行后的结果。
- 为了避免方法返回值不符合预期,该抛异常的地方一定要抛。
- 优化代码,删去不必要的逻辑分支和尽量不要写重复的代码,使代码看起来整洁。
- 程序的执行情况、结果一定要保留日志。
1、ssh no existing session,sftp超时时间设置:
在代码无错的情况下大量ip出现No existing session,排查后定位在代码的写法上,下面是一个正确的示例。由于最开始没考虑到ssh连接的几种情况导致了重写好几遍。另外sftp的实例貌似不能直接设置连接超时时间,所以我采用了先建立ssh连接再打开sftp的方法。
import paramiko
username = 'root'
port = 22
pkey = paramiko.RSAKey.from_private_key_file('/root/.ssh/id_rsa') # 导入公钥
def ssh_client( ip, user=None, passwd=None, auth='id_rsa'):
client = paramiko.SSHClient() # 实例化ssh
client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) # 配置ssh互信
if auth == 'id_rsa': # 公钥认证
client.connect(ip, port, username, pkey=pkey, banner_timeout=60, timeout=timeout)
elif auth == 'noAuth': # 用户名密码认证
if user is not None and passwd is not None:
client.connect(ip, port, user, passwd, banner_timeout=60, timeout=timeout)
raise ValueError('传入的用户名密码不能为空')
raise NameError('不存在此%s认证方式' % auth)
return client
def sftp_client(ip, user=None, passwd=None, auth='id_rsa'):
ssh = ssh_client(ip, user, passwd, auth)
sftp = ssh.open_sftp()
return sftp
def check_remote_folders(sftp, remote_path, remote_folders: list):
# 递归遍历远程路径中的目录,并添加到remote_folders中
for f in sftp.listdir_attr(remote_path):
# 检查路径状态是否为目录
if stat.S_ISDIR(f.st_mode):
# 递归调用自己
check_remote_folders(sftp, remote_path + '/' + f.filename, remote_folders)
# 添加遍历到的目录信息到列表中
remote_folders.append(remote_path + '/' + f.filename)
def batch_sftp_get(ip, remote_path, local_path, user=None, passwd=None, auth='id_rsa'):
pool = multiprocessing.Pool(5)for i in ip:
pool.apply_async(sftp_get, (i, remote_path, local_path, user, passwd, auth,))
- 5.
from concurrent.futures import ThreadPoolExecutor, as_completed
import time, json
def batch_ssh(self, ip, cmd=None, user=None, passwd=None, cmd_type='one', task_name='default', auth='id_rsa'):
pool = ThreadPoolExecutor(self.workers)
task = []
if cmd_type == 'one':
task = [pool.submit(self.ssh_exec, i, cmd, user, passwd, auth) for i in ip]
elif cmd_type == 'many':
if isinstance(ip, list):
for i in ip:
separator = ''if ':' in i:
separator = ':'elif ',' in i:
separator = ','if separator != '':
data = i.split(separator)
task.append(pool.submit(self.ssh_client, data[0], data[1], user, passwd))
return '请检查ip和命令间的分隔符'else:
return 'ip的类型为%s, 请传入一个正确的类型' % type(ip)
return 'cmd_type不存在%s值, 请传入一个正确的参数' % cmd_type
self.logger.debug('检查变量task:%s' % task)
results = {}
for future in as_completed(task):
res = future.result().split(':')
results[res[1]] = {res[0]: res[2]}
if 'success' in future.result():
print('\033[32;1m%s\033[0m' % future.result().replace('success:', ''))
elif 'failed' in future.result():
print('\033[31;1m%s\033[0m' % future.result().replace('failed:', ''))
json_results = {
'task_name': task_name,
'task_sn': self.task_sn,
'start_time': self.now_time,
'cost_time': '%.2fs' % (time.perf_counter() - self.s),
'results': results
self.logger.info('json_results:%s' % json_results)
with open(self.log_path + 'task_%s_%s.log' % (task_name, self.task_sn), 'a') as f:
return json_results
import os
import json
def get_info(self, path):
self.logger.debug('接收参数path:%s'.encode('utf8') % path.encode('utf8'))
if os.path.exists(path):
info_list = [i.replace('\n', '') for i in open(path, 'r', encoding='utf8').readlines()]
return info_list
self.logger.warning('%s不存在,请传入一个正确的目录' % path)
raise ValueError('%s不存在,请传入一个正确的目录' % path)
def log_analysis(filename):if os.path.exists(filename):
data = json.load(open(filename, 'r', encoding='utf8'))
return data
except Exception as e:
print('%s无法解析该类型文件' % filename + ' ' + str(e))
raise TypeError('%s无法解析该类型文件' % filename + ' ' + str(e))
raise ValueError('该%s文件路径不存在,请传入一个正确的文件路径' % filename)
def show_log(self, filename, mode=None):
data: dict = self.log_analysis(filename)
if isinstance(data, dict):
for key, value in data["results"].items():
if 'success' in value.keys():
if mode == 'success':
elif mode is None:
print('%s:%s' % (key, value['success'].replace('\r\n', '')))
elif 'failed' in value.keys():
if mode == 'failed':
elif mode is None:
print('%s:%s' % (key, value['failed'].replace('\r\n', '')))
import multiprocessing
import os
import re
import time
import stat
import json
import random
import logging
import asyncio
import argparse
import paramiko
class TaskManager:def __init__(self, timeout=10, workers=15, system='linux'):
self.username = 'root'
self.port = 22
self.datetime = time.strftime("%Y-%m-%d", time.localtime())
self.timeout = timeout
self.workers = workers
self.now_time = time.strftime("%Y/%m/%d %H:%M:%S", time.localtime())
self.s = time.perf_counter()
self.task_sn = self.sn_random_generator()
if system == 'linux':
self.pkey = paramiko.RSAKey.from_private_key_file('/root/.ssh/id_rsa')
self.log_path = '/tmp/TaskManager/log/'
self.log_debug_path = '/tmp/TaskManager/debug/'elif system == 'windows':
self.pkey = paramiko.RSAKey.from_private_key_file(r'C:\Users\001\.ssh\id_rsa')
self.log_path = r'D:\tmp\TaskManager\log\\'
self.log_debug_path = r'D:\tmp\TaskManager\debug\\'
if os.path.exists(self.log_path) is False:
os.makedirs(self.log_path, exist_ok=True)
if os.path.exists(self.log_debug_path) is False:
os.makedirs(self.log_debug_path, exist_ok=True)
self.logger = logging.getLogger(__name__)
self.handler = logging.FileHandler(self.log_debug_path + '%s_%s.log' % (self.datetime, self.task_sn))
self.formatter = logging.Formatter("%(asctime)s[%(levelname)s][%(funcName)s]%(message)s ")
def ssh_client(self, ip, user=None, passwd=None, auth='id_rsa'):
client = paramiko.SSHClient()
if auth == 'id_rsa':
self.logger.info('正在 SSH 连接%s'.encode('utf8') % str(ip).encode('utf8'))
client.connect(ip, self.port, self.username, pkey=self.pkey, banner_timeout=60, timeout=self.timeout)
elif auth == 'noAuth':
if user is not None and passwd is not None:
client.connect(ip, self.port, user, passwd, banner_timeout=60, timeout=self.timeout)
# allow_agent=False, look_for_keys=False# No existing session 解决办法 else:raise ValueError('传入的用户名密码不能为空')else:raise NameError('不存在此%s 认证方式' % auth)return client
def ssh_exec(self, ip, cmd, user=None, passwd=None, auth='id_rsa'):try:
ssh = self.ssh_client(ip, user, passwd, auth)
stdin, stdout, stderr = ssh.exec_command(command=cmd, get_pty=True)
self.logger.debug('%s:stdin 输入:%s' % (ip, stdin))
self.logger.debug('%s:stderr 错误:%s' % (ip, stderr))
self.logger.debug('%s:stdout 输出:%s' % (ip, stdout))
result = stdout.read().decode('utf-8')
return 'success:' + ip + ':' + str(result)
except Exception as e:
return 'failed:' + ip + ':' + str(e)
def batch_ssh(self, ip, cmd=None, user=None, passwd=None, cmd_type='one', task_name='default', auth='id_rsa'):
self.logger.debug('接收参数 ip:%s, cmd:%s, user:%s passwd:%s cmd_type:%s, task_name:%s' %
(ip, cmd, user, passwd, cmd_type, task_name))
print('\033[35;1m-------------------Task is set up! Time:%s ----------------------\033[0m' % self.now_time)
pool = ThreadPoolExecutor(self.workers)
task = []
if cmd_type == 'one':
task = [pool.submit(self.ssh_exec, i, cmd, user, passwd, auth) for i in ip]
elif cmd_type == 'many':
if isinstance(ip, list):
for i in ip:
separator = ''if ':' in i:
separator = ':'elif ',' in i:
separator = ','
self.logger.debug('检查变量 separator:%s' % separator)
if separator != '':
data = i.split(separator)
task.append(pool.submit(self.ssh_client, data[0], data[1], user, passwd))
self.logger.warning('separator:%s 不符合要求' % separator)
return '请检查 ip 和命令间的分隔符'else:
self.logger.warning('ip 的类型为%s, 请传入一个正确的类型' % type(ip))
return 'ip 的类型为%s, 请传入一个正确的类型' % type(ip)
self.logger.warning('cmd_type 不存在%s 值, 请传入一个正确的参数' % cmd_type)
return 'cmd_type 不存在%s 值, 请传入一个正确的参数' % cmd_type
self.logger.debug('检查变量 task:%s' % task)
results = {}
for future in as_completed(task):
res = future.result().split(':')
results[res[1]] = {res[0]: res[2]}
if 'success' in future.result():
print('\033[32;1m%s\033[0m' % future.result().replace('success:', ''))
elif 'failed' in future.result():
print('\033[31;1m%s\033[0m' % future.result().replace('failed:', ''))
print('\033[35;1m---------------Task is finished! Cost:%.2fs Task_sn:%s-------------------\033[0m'
% ((time.perf_counter() - self.s), self.task_sn))
json_results = {
'task_name': task_name,
'task_sn': self.task_sn,
'start_time': self.now_time,
'cost_time': '%.2fs' % (time.perf_counter() - self.s),
'results': results
self.logger.info('json_results:%s' % json_results)
with open(self.log_path + 'task_%s_%s.log' % (task_name, self.task_sn), 'a') as f:
return json_results
def sftp_put(self, ip, local_path, remote_path, user=None, passwd=None, auth='id_rsa'):# 路径类型:目录、文件、不存在(抛异常)# 本地路径类型检查 if os.path.exists(local_path):if os.path.isdir(local_path):
local_path_type = 'directory'elif os.path.isfile(local_path):
local_path_type = 'file'else:
raise NotADirectoryError('本地路径%s 无效' % local_path)
raise NotADirectoryError('本地路径%s 无效' % local_path)
# 启动 ssh 连接并打开 sftp 服务(直接启动 sftp 连接无法设置超时时间)
ssh = self.ssh_client(ip, user, passwd, auth)
sftp = ssh.open_sftp()
# 远程路径类型类型 try:if remote_path[0] == '/' and remote_path[-1] != '/':
remote_path_type = 'file'elif stat.S_ISDIR(sftp.stat(remote_path).st_mode): # 检查远程路径是否为目录
remote_path_type = 'directory'else:
raise NotADirectoryError("请传入一个正确的远程路径:%s" % remote_path)
except Exception as e:
raise ValueError(ip, e)
# 传目录 if local_path_type == 'directory' and remote_path_type == 'directory':# 若本地路径为多级目录,则保留末级路径。
directory_split = [i for i in local_path.split('/') if i != '']
# 检查远程路径是否存在该本地末级目录 try:if stat.S_ISDIR(sftp.stat(remote_path + directory_split[-1] + '/').st_mode):raise NotADirectoryError('%s 远程路径已存在此目录:%s' % (ip, remote_path + directory_split[-1] + '/'))except:pass
# 多余的目录信息
redundant_directory = ''if len(directory_split) > 1:
redundant_directory = '/' + '/'.join(directory_split[:-1]) + '/' # 过滤上级目录名称
# 遍历本地路径下所有的文件路径, 目录。
local_files_path = []
local_directory = []
for root, dirs, files in os.walk(local_path):
for file in files:
if root[-1] != '/':
local_files_path.append(root + '/' + file)
elif root[-1] == '/':
local_files_path.append(root + file)
# 在远程路径上创建对应的目录 for directory in local_directory:# 目录切割使用 replace 方法不是很严谨,镜像目录会出问题,留在这里抛个异常,后续再改。try:
sftp.mkdir(remote_path + directory.replace(redundant_directory, ''))
except Exception as e:
raise ValueError(ip, e, remote_path + directory.replace(redundant_directory, ''))
# 上传文件 for local_file in local_files_path:
sftp.put(local_file, remote_path + local_file.replace(redundant_directory, ''))
print('%s put: %s' % (ip, remote_path + local_file.replace(redundant_directory, '')))
# 传文件 elif local_path_type == 'file':if remote_path_type == 'directory':
file = local_path.split('/')[-1]
sftp.put(local_path, remote_path + file)
print('%s put: %s' % (ip, remote_path + file))
elif remote_path_type == 'file': # 远程路径为文件格式
sftp.put(local_path, remote_path)
print('%s put: %s' % (ip, remote_path))
raise NotADirectoryError('请检查传入的本地路径:%s 和远程路径:%s 是否正确' % (local_path, remote_path))
def sftp_get(self, ip, remote_path, local_path, user=None, passwd=None, auth='id_rsa'):
client = self.ssh_client(ip, user, passwd, auth)
sftp = client.open_sftp()
# 检查本地路径是否存在,再检查是否为目录 if os.path.exists(local_path):if os.path.isdir(local_path):
local_path_type = 'directory'else:
raise ValueError('本地路径:%s 错误,请传入一个正确的目录路径' % local_path)
raise NotADirectoryError('本地路径:%s 不存在,请传入一个正确的本地路径' % local_path)
# 检查远程路径是否为文件或者目录 try:if stat.S_ISDIR(sftp.stat(remote_path).st_mode):
remote_path_type = 'directory'else:
remote_path_type = 'file'except:
raise NotADirectoryError('远程路径:%s 不存在' % remote_path)
# 如果远程路径和本地路径都是目录 if remote_path_type == 'directory' and local_path_type == 'directory':
folders = [] # 提前创建一个列表,用来接收递归遍历出的目录路径。# 递归遍历远程路径中的所有目录
self.check_remote_folders(sftp=sftp, remote_path=remote_path, remote_folders=folders)
folders.append(remote_path) # 别忘了一级目录
# 遍历远程路径中的所有目录 for folder in folders:
local_download_path = local_path + '/%s/' % ip + folder + '/'# 在本地新建文件夹
os.makedirs(local_download_path, exist_ok=True)
files = sftp.listdir_attr(folder) # 获取远程目录内的文件信息# 遍历文件路径 for file in files:if stat.S_ISDIR(file.st_mode) is False:
file_path = file.filename
print('%s get:%s' % (ip, local_download_path + '/' + file_path))
sftp.get(folder + '/' + file_path, local_download_path + '/' + file_path)
# 如果远程路径为文件,本地路径为目录 elif remote_path_type == 'file' and local_path_type == 'directory':
remote_file = remote_path.split('/')[-1]
local_download_path = local_path + '%s/' % ip
os.makedirs(local_download_path, exist_ok=True)
print('%s get: %s' % (ip, local_download_path + remote_file))
sftp.get(remote_path, local_download_path + remote_file)
def batch_sftp_get(self, ip, remote_path, local_path, user=None, passwd=None, auth='id_rsa'):
pool = multiprocessing.Pool(5)
for i in ip:
pool.apply_async(self.sftp_get, (i, remote_path, local_path, user, passwd, auth,))
def batch_sftp_put(self, ip, local_path, remote_path, user=None, passwd=None, auth='id_rsa'):
pool = ThreadPoolExecutor(max_workers=10)
task = [pool.submit(self.sftp_put, i, local_path, remote_path, user, passwd, auth) for i in ip]
for future in as_completed(task):
def check_remote_folders(self, sftp, remote_path, remote_folders: list):# 递归遍历远程路径中的目录,并添加到 remote_folders 中 for f in sftp.listdir_attr(remote_path):# 检查路径状态是否为目录 if stat.S_ISDIR(f.st_mode):# 递归调用自己
self.check_remote_folders(sftp, remote_path + '/' + f.filename, remote_folders)
# 添加遍历到的目录信息到列表中
remote_folders.append(remote_path + '/' + f.filename)
def get_info(self, path):
self.logger.debug('接收参数 path:%s'.encode('utf8') % path.encode('utf8'))
if os.path.exists(path):
info_list = [i.replace('\n', '') for i in open(path, 'r', encoding='utf8').readlines()]
return info_list
self.logger.warning('%s 不存在,请传入一个正确的目录' % path)
raise ValueError('%s 不存在,请传入一个正确的目录' % path)
@staticmethoddef sn_random_generator():
lower_letters = random.sample([chr(i + ord('a')) for i in range(26)], 7)
num = random.sample([str(i) for i in range(10)], 5)
passwd = lower_letters + num
return ''.join(passwd)
@staticmethoddef log_analysis(filename):if os.path.exists(filename):try:
data = json.load(open(filename, 'r', encoding='utf8'))
return data
except Exception as e:
print('%s 无法解析该类型文件' % filename + ' ' + str(e))
raise TypeError('%s 无法解析该类型文件' % filename + ' ' + str(e))
raise ValueError('该%s 文件路径不存在,请传入一个正确的文件路径' % filename)
def show_log(self, filename, mode=None):
data: dict = self.log_analysis(filename)
if isinstance(data, dict):
for key, value in data["results"].items():
if 'success' in value.keys():
if mode == 'success':
elif mode is None:
print('%s:%s' % (key, value['success'].replace('\r\n', '')))
elif 'failed' in value.keys():
if mode == 'failed':
elif mode is None:
print('%s:%s' % (key, value['failed'].replace('\r\n', '')))
@staticmethodasync def ping(ip):
pro = await asyncio.create_subprocess_exec('ping', '-c', '5', '-w', '5', ip, stdout=asyncio.subprocess.PIPE)
stdout = await pro.communicate()
async def run(self, ip):
task = [self.ping(i) for i in ip]
await asyncio.wait(task)
def start(self, ip):
loop = asyncio.get_event_loop()
start_time = time.time()
print('总共耗时: %.2f' % (time.time() - start_time))
@staticmethoddef ping_analysis(filename):
data = [i.replace('\n', '') for i in open(filename, 'r', encoding='utf8').readlines()]
ip_list = []
for i in data:
if '64 bytes' in i:
ip_data = re.search(r'[0-9]+\.[0-9]+\.[0-9]+\.[0-9]+', i)
if len(ip_list) > 0:
ip = list(set(ip_list))
for j in ip:
return ip
print('所有 ip 均不通,或者传入的文件有误')
return '所有 ip 均不通,或者传入的文件有误'
if __name__ == '__main__':
taskManager = TaskManager()
parser = argparse.ArgumentParser(description='TaskManager v1.3.9 build by hrj')
parser.add_argument('path', help='文件路径')
parser.add_argument('-a', action='store_true', help='多个 ip 执行不同的命令')
parser.add_argument('-b', action='store', help='多个 ip 执行相同命令', metavar='[shell 命令]')
parser.add_argument('-l', action='store_true', help='解析执行命令生成的日志')
parser.add_argument('-ls', action='store_true', help='获取执行成功的 ip')
parser.add_argument('-lf', action='store_true', help='获取执行失败的 ip')
parser.add_argument('-p', action='store_true', help='异步 ping 多个 ip')
parser.add_argument('-q', action='store_true', help='解析异步 ping 的能通的 ip')
parser.add_argument('-put', nargs=2, help='多个 ip 批量 sftp 上传相同的文件', metavar=('[本地路径]', '[远程路径]'))
parser.add_argument('-get', nargs=2, help='多个 ip 批量 sftp 下载相同的文件', metavar=('[远程路径]', '[本地路径]'))
parser.add_argument('-auth', nargs=2, help='ssh/sftp 用户名、密码认证', metavar=('[用户名]', '[密码]'))
args = parser.parse_args()
taskManager.logger.debug('接受参数:%s'.encode('utf8') % str(vars(args)).encode('utf8'))
# ssh
username = None
password = None
auth_type = 'id_rsa'
file_info = taskManager.get_info(args.path)
# ssh 认证 if args.auth:
username = args.auth[0]
password = args.auth[1]
auth_type = 'noAuth'
if args.a:
taskManager.batch_ssh(ip=file_info, cmd_type='many', user=username, passwd=password, auth=auth_type)
elif args.b:
taskManager.batch_ssh(ip=file_info, cmd=args.b, user=username, passwd=password, auth=auth_type)
# sftpelif args.put:
ip=file_info, local_path=args.put[0], remote_path=args.put[1],
user=username, passwd=password, auth=auth_type)
elif args.get:
# sftp 多进程并发会出现代码瞬间执行完毕,不报错的情况,待解决。for ip in file_info:
ip=ip, remote_path=args.get[0], local_path=args.get[1], user=username, passwd=password,
# 日志 elif args.l:
elif args.ls:
taskManager.show_log(args.path, mode='success')
elif args.lf:
taskManager.show_log(args.path, mode='failed')
# 异步 pingelif args.p:
elif args.q:
- path为IP地址文件路径时,使用-a , -b , -get , -put, -p。-auth仅支持配合前4个参数一起使用。
- path为日志文件(/tmp/TaskManager/log/task_defalut_xxxxxx.log)路径时,使用-l, -ls, -lf。
- 当使用-p时需要重定向到文件才能用-q解析文件得到ping通的ip。
- 批量执行命令时,命令行末尾会有一个Task_sn:xxxxxxxxx这个是日志文件的名称不重复的部分,便于去找日志(感觉有点蹩脚,如果有好的办法请告诉我)。
可以配合 grep,awk 等命令精准过滤。
个人认为 Python 在初中级运维工作中的性质更像是工具,以提升工作效率、减少管理成本为主。可以从当前繁琐的工作中解脱出来,去探索更有价值的事情。python 本质上并不会减少故障的产生,所以在不同的阶段合理利用自身掌握的知识解决当前最重要的痛点,千万不要本末倒置。