深圳幻海软件技术有限公司 欢迎您!

纯干货!Python 在运维中的应用 :批量 ssh/sftp

2023-02-28

日常工作中需要大量、频繁地使用ssh到服务器查看、拉取相关的信息或者对服务器进行变更。目前公司大量使用的shell,但是随着逻辑的复杂化、脚本管理的精细化,shell已经不满足日常需求,于是我尝试整合工作中的需求,制作适合的工具。由于管理制度的缺陷,我以工作流程为核心思考适合自己的运维方式,提升工作

日常工作中需要大量、频繁地使用ssh到服务器查看、拉取相关的信息或者对服务器进行变更。目前公司大量使用的shell,但是随着逻辑的复杂化、脚本管理的精细化,shell已经不满足日常需求,于是我尝试整合工作中的需求,制作适合的工具。 由于管理制度的缺陷,我以工作流程为核心思考适合自己的运维方式,提升工作效率,把时间留给更有价值的事情。 完整代码在最后,请大家参考。

环境:

  • 生产:4000+物理服务器,近 3000 台虚拟机。
  • 开发环境:python3.6、redhat7.9,除了paramiko为第三方模块需要自己安装,其他的直接import即可。

主要应用方向:

  1. 配置变更:例如服务器上线时需要批量校正系统分区容量、及挂载数据盘。
  2. 配置信息查询过滤:例如过滤防火墙规则、过滤网卡配置。
  3. 存活检测:设置定时任务,定时轮询服务器的 ssh 状态是否正常。
  4. 文件传输:多个 ip 同时传输目录/文件。

基本原则:

批量执行操作是一把双刃剑。批量执行操作可以提升工作效率,但是随之而来的风险不可忽略。

风险案例如下:

挂载很多数据盘,通常先格式化硬盘,再挂载数据盘,最后再写入将开机挂载信息写入/etc/fstab文件。在批量lsblk检查硬盘信息的时候发现有的系统盘在/sda有的在/sdm,如果不事先检查机器相关配置是否一致直接按照工作经验去执行批量操作,会很容易造成个人难以承受的灾难。

在执行批量操作时按照惯例:格式化硬盘->挂载->开机挂载的顺序去执行,假设有的机器因为某些故障导致格式化硬盘没法正确执行。在处理这类问题的时候通常会先提取出失败的ip,并再按照惯例执行操作。运维人员会很容易忽略开机挂载的信息已经写过了,导致复写(这都是血和泪的教训)。

所以,为了避免故障,提升工作效率,我认为应当建立团队在工作上的共识,应当遵守以下原则:

  1. 批量操作前应当准备回退方案。
  2. 批量操作前作前先确定检查目标服务器相关的配置的一致性。
  3. 批量操作时应当把重复执行会影响系统的操作和不影响的分开执行。
  4. 批量操作后应当再次验证操作结果是否符合预期。

当然,代码的规范也应当重视起来,不仅是为了便于审计,同时也需要便于溯源。我认为应当注意以下几点:

  1. 关键方法一定要记录传入的参数以及执行后的结果。
  2. 为了避免方法返回值不符合预期,该抛异常的地方一定要抛。
  3. 优化代码,删去不必要的逻辑分支和尽量不要写重复的代码,使代码看起来整洁。
  4. 程序的执行情况、结果一定要保留日志。

技术难点

1、ssh no existing session,sftp超时时间设置:

在代码无错的情况下大量ip出现No existing session,排查后定位在代码的写法上,下面是一个正确的示例。由于最开始没考虑到ssh连接的几种情况导致了重写好几遍。另外sftp的实例貌似不能直接设置连接超时时间,所以我采用了先建立ssh连接再打开sftp的方法。

import paramiko

username = 'root'
port = 22
pkey = paramiko.RSAKey.from_private_key_file('/root/.ssh/id_rsa')  # 导入公钥
timeout=10


def ssh_client( ip, user=None, passwd=None, auth='id_rsa'):
    client = paramiko.SSHClient()  # 实例化ssh
    client.set_missing_host_key_policy(paramiko.AutoAddPolicy())  # 配置ssh互信
    if auth == 'id_rsa':  # 公钥认证
        client.connect(ip, port, username, pkey=pkey, banner_timeout=60, timeout=timeout)
    elif auth == 'noAuth':  # 用户名密码认证
        if user is not None and passwd is not None:
            client.connect(ip, port, user, passwd, banner_timeout=60, timeout=timeout)
        else:
            raise ValueError('传入的用户名密码不能为空')
    else:
        raise NameError('不存在此%s认证方式' % auth)
    return client


def sftp_client(ip, user=None, passwd=None, auth='id_rsa'):
    ssh = ssh_client(ip, user, passwd, auth)
    sftp = ssh.open_sftp()
    return sftp
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.

2、sftp中的get()和put()方法仅能传文件,不支持直接传目录:

不能直接传目录,那换个思路,遍历路径中的目录和文件,先创建目录再传文件就能达到一样的效果了。在paramiko的sftp中sftp.listdir_attr()方法可以获取远程路径中的文件、目录信息。那么我们可以写一个递归来遍历远程路径中的所有文件和目录(传入一个列表是为了接收递归返回的值)。

def check_remote_folders(sftp, remote_path, remote_folders: list):
        # 递归遍历远程路径中的目录,并添加到remote_folders中
        for f in sftp.listdir_attr(remote_path):
            # 检查路径状态是否为目录
            if stat.S_ISDIR(f.st_mode):
                # 递归调用自己
                check_remote_folders(sftp, remote_path + '/' + f.filename, remote_folders)
                #  添加遍历到的目录信息到列表中
                remote_folders.append(remote_path + '/' + f.filename)
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.

python自带的os模块中的os.walk()方法可以遍历到本地路径中的目录和文件。

  local_files_path = []
        local_directory = []
        for root, dirs, files in os.walk(local_path):
            local_directory.append(root)
            for file in files:if root[-1] != '/':
                    local_files_path.append(root + '/' + file)
                elif root[-1] == '/':
                    local_files_path.append(root + file)
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.

3、多线程多个ip使用sftp.get()方法时无法并发。

改成多进程即可。

 def batch_sftp_get(ip, remote_path, local_path, user=None, passwd=None, auth='id_rsa'):
        pool = multiprocessing.Pool(5)for i in ip:
            pool.apply_async(sftp_get, (i, remote_path, local_path, user, passwd, auth,))
        pool.close()
        pool.join()
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.

4、多个ip需要执行相同命令或不同的命令。

由于是日常使用的场景不会很复杂,所以借鉴了ansible的playbook,读取提前准备好的配置文件即可,然后再整合到之前定义的ssh函数中。

# 配置文件大概是这样
192.168.0.100:df -Th | grep xfs; lsblk | grep disk | wc -l
192.168.0.101:ip a | grep team | wc -l
192.168.0.102:route -n
...
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
from concurrent.futures import ThreadPoolExecutor, as_completed
import time, json


def batch_ssh(self, ip, cmd=None, user=None, passwd=None, cmd_type='one', task_name='default', auth='id_rsa'):
    pool = ThreadPoolExecutor(self.workers)
    task = []
    if cmd_type == 'one':
        task = [pool.submit(self.ssh_exec, i, cmd, user, passwd, auth) for i in ip]
    elif cmd_type == 'many':
        if isinstance(ip, list):
            for i in ip:
                separator = ''if ':' in i:
                    separator = ':'elif ',' in i:
                    separator = ','if separator != '':
                    data = i.split(separator)
                    task.append(pool.submit(self.ssh_client, data[0], data[1], user, passwd))
                else:
                    return '请检查ip和命令间的分隔符'else:
            return 'ip的类型为%s, 请传入一个正确的类型' % type(ip)
    else:
        return 'cmd_type不存在%s值, 请传入一个正确的参数' % cmd_type
    self.logger.debug('检查变量task:%s' % task)
    results = {}
    for future in as_completed(task):
        res = future.result().split(':')
        results[res[1]] = {res[0]: res[2]}
        if 'success' in future.result():
            print('\033[32;1m%s\033[0m' % future.result().replace('success:', ''))
        elif 'failed' in future.result():
            print('\033[31;1m%s\033[0m' % future.result().replace('failed:', ''))
    pool.shutdown()
    json_results = {
        'task_name': task_name,
        'task_sn': self.task_sn,
        'start_time': self.now_time,
        'cost_time': '%.2fs' % (time.perf_counter() - self.s),
        'results': results
    }
    self.logger.info('json_results:%s' % json_results)
    with open(self.log_path + 'task_%s_%s.log' % (task_name, self.task_sn), 'a') as f:
        f.write(json.dumps(json_results))
    return json_results
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • 33.
  • 34.
  • 35.
  • 36.
  • 37.
  • 38.
  • 39.
  • 40.
  • 41.
  • 42.
  • 43.

同时,我们还衍生出一个需求,既然都要读取配置,那同样也可以提前把ip地址准备在文件里。正好也能读取我们返回的执行程序的结果。

import os
import json


def get_info(self, path):
    self.logger.debug('接收参数path:%s'.encode('utf8') % path.encode('utf8'))
    if os.path.exists(path):
        info_list = [i.replace('\n', '') for i in open(path, 'r', encoding='utf8').readlines()]
        return info_list
    else:
        self.logger.warning('%s不存在,请传入一个正确的目录' % path)
        raise ValueError('%s不存在,请传入一个正确的目录' % path)


def log_analysis(filename):if os.path.exists(filename):
        try:
            data = json.load(open(filename, 'r', encoding='utf8'))
            return data
        except Exception as e:
            print('%s无法解析该类型文件' % filename + ' ' + str(e))
            raise TypeError('%s无法解析该类型文件' % filename + ' ' + str(e))
    else:
        raise ValueError('该%s文件路径不存在,请传入一个正确的文件路径' % filename)


def show_log(self, filename, mode=None):
    data: dict = self.log_analysis(filename)
    if isinstance(data, dict):
        for key, value in data["results"].items():
            if 'success' in value.keys():
                if mode == 'success':
                    print(key)
                elif mode is None:
                    print('%s:%s' % (key, value['success'].replace('\r\n', '')))
            elif 'failed' in value.keys():
                if mode == 'failed':
                    print(key)
                elif mode is None:
                    print('%s:%s' % (key, value['failed'].replace('\r\n', '')))
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • 33.
  • 34.
  • 35.
  • 36.
  • 37.
  • 38.
  • 39.

完整代码展示:

from concurrent.futures import ThreadPoolExecutor, as_completed
import multiprocessing
import os
import re
import time
import stat
import json
import random
import logging
import asyncio
import argparse
import paramiko


class TaskManager:def __init__(self, timeout=10, workers=15, system='linux'):
        self.username = 'root'
        self.port = 22
        self.datetime = time.strftime("%Y-%m-%d", time.localtime())
        self.timeout = timeout
        self.workers = workers
        self.now_time = time.strftime("%Y/%m/%d %H:%M:%S", time.localtime())
        self.s = time.perf_counter()
        self.task_sn = self.sn_random_generator()

        if system == 'linux':
            self.pkey = paramiko.RSAKey.from_private_key_file('/root/.ssh/id_rsa')
            self.log_path = '/tmp/TaskManager/log/'
            self.log_debug_path = '/tmp/TaskManager/debug/'elif system == 'windows':
            self.pkey = paramiko.RSAKey.from_private_key_file(r'C:\Users\001\.ssh\id_rsa')
            self.log_path = r'D:\tmp\TaskManager\log\\'
            self.log_debug_path = r'D:\tmp\TaskManager\debug\\'

        if os.path.exists(self.log_path) is False:
            os.makedirs(self.log_path, exist_ok=True)
        if os.path.exists(self.log_debug_path) is False:
            os.makedirs(self.log_debug_path, exist_ok=True)

        self.logger = logging.getLogger(__name__)
        self.logger.setLevel(level=logging.DEBUG)
        self.handler = logging.FileHandler(self.log_debug_path + '%s_%s.log' % (self.datetime, self.task_sn))
        self.formatter = logging.Formatter("%(asctime)s[%(levelname)s][%(funcName)s]%(message)s ")
        self.handler.setFormatter(self.formatter)
        self.logger.addHandler(self.handler)
        self.logger.info('初始化完成'.encode(encoding='utf8'))

    def ssh_client(self, ip, user=None, passwd=None, auth='id_rsa'):
        client = paramiko.SSHClient()
        client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        if auth == 'id_rsa':
            self.logger.info('正在 SSH 连接%s'.encode('utf8') % str(ip).encode('utf8'))
            client.connect(ip, self.port, self.username, pkey=self.pkey, banner_timeout=60, timeout=self.timeout)
        elif auth == 'noAuth':
            if user is not None and passwd is not None:
                client.connect(ip, self.port, user, passwd, banner_timeout=60, timeout=self.timeout)
                #  allow_agent=False, look_for_keys=False# No existing session 解决办法 else:raise ValueError('传入的用户名密码不能为空')else:raise NameError('不存在此%s 认证方式' % auth)return client

    def ssh_exec(self, ip, cmd, user=None, passwd=None, auth='id_rsa'):try:
            ssh = self.ssh_client(ip, user, passwd, auth)
            stdin, stdout, stderr = ssh.exec_command(command=cmd, get_pty=True)
            self.logger.debug('%s:stdin 输入:%s' % (ip, stdin))
            self.logger.debug('%s:stderr 错误:%s' % (ip, stderr))
            self.logger.debug('%s:stdout 输出:%s' % (ip, stdout))
            result = stdout.read().decode('utf-8')
            ssh.close()
            return 'success:' + ip + ':' + str(result)
        except Exception as e:
            return 'failed:' + ip + ':' + str(e)

    def batch_ssh(self, ip, cmd=None, user=None, passwd=None, cmd_type='one', task_name='default', auth='id_rsa'):
        self.logger.debug('接收参数 ip:%s, cmd:%s, user:%s passwd:%s cmd_type:%s, task_name:%s' %
                          (ip, cmd, user, passwd, cmd_type, task_name))
        print('\033[35;1m-------------------Task is set up! Time:%s ----------------------\033[0m' % self.now_time)
        pool = ThreadPoolExecutor(self.workers)
        task = []
        if cmd_type == 'one':
            task = [pool.submit(self.ssh_exec, i, cmd, user, passwd, auth) for i in ip]
        elif cmd_type == 'many':
            if isinstance(ip, list):
                for i in ip:
                    separator = ''if ':' in i:
                        separator = ':'elif ',' in i:
                        separator = ','
                    self.logger.debug('检查变量 separator:%s' % separator)
                    if separator != '':
                        data = i.split(separator)
                        task.append(pool.submit(self.ssh_client, data[0], data[1], user, passwd))
                    else:
                        self.logger.warning('separator:%s 不符合要求' % separator)
                        return '请检查 ip 和命令间的分隔符'else:
                self.logger.warning('ip 的类型为%s, 请传入一个正确的类型' % type(ip))
                return 'ip 的类型为%s, 请传入一个正确的类型' % type(ip)
        else:
            self.logger.warning('cmd_type 不存在%s 值, 请传入一个正确的参数' % cmd_type)
            return 'cmd_type 不存在%s 值, 请传入一个正确的参数' % cmd_type
        self.logger.debug('检查变量 task:%s' % task)
        results = {}
        for future in as_completed(task):
            res = future.result().split(':')
            results[res[1]] = {res[0]: res[2]}
            if 'success' in future.result():
                print('\033[32;1m%s\033[0m' % future.result().replace('success:', ''))
            elif 'failed' in future.result():
                print('\033[31;1m%s\033[0m' % future.result().replace('failed:', ''))
        pool.shutdown()
        print('\033[35;1m---------------Task is finished! Cost:%.2fs Task_sn:%s-------------------\033[0m'
              % ((time.perf_counter() - self.s), self.task_sn))
        json_results = {
            'task_name': task_name,
            'task_sn': self.task_sn,
            'start_time': self.now_time,
            'cost_time': '%.2fs' % (time.perf_counter() - self.s),
            'results': results
        }
        self.logger.info('json_results:%s' % json_results)
        with open(self.log_path + 'task_%s_%s.log' % (task_name, self.task_sn), 'a') as f:
            f.write(json.dumps(json_results))
        return json_results

    def sftp_put(self, ip, local_path, remote_path, user=None, passwd=None, auth='id_rsa'):# 路径类型:目录、文件、不存在(抛异常)# 本地路径类型检查 if os.path.exists(local_path):if os.path.isdir(local_path):
                local_path_type = 'directory'elif os.path.isfile(local_path):
                local_path_type = 'file'else:
                raise NotADirectoryError('本地路径%s 无效' % local_path)
        else:
            raise NotADirectoryError('本地路径%s 无效' % local_path)

        # 启动 ssh 连接并打开 sftp 服务(直接启动 sftp 连接无法设置超时时间)
        ssh = self.ssh_client(ip, user, passwd, auth)
        sftp = ssh.open_sftp()

        # 远程路径类型类型 try:if remote_path[0] == '/' and remote_path[-1] != '/':
                remote_path_type = 'file'elif stat.S_ISDIR(sftp.stat(remote_path).st_mode):  # 检查远程路径是否为目录
                remote_path_type = 'directory'else:
                raise NotADirectoryError("请传入一个正确的远程路径:%s" % remote_path)
        except Exception as e:
            raise ValueError(ip, e)

        # 传目录 if local_path_type == 'directory' and remote_path_type == 'directory':# 若本地路径为多级目录,则保留末级路径。
            directory_split = [i for i in local_path.split('/') if i != '']

            # 检查远程路径是否存在该本地末级目录 try:if stat.S_ISDIR(sftp.stat(remote_path + directory_split[-1] + '/').st_mode):raise NotADirectoryError('%s 远程路径已存在此目录:%s' % (ip, remote_path + directory_split[-1] + '/'))except:pass

            # 多余的目录信息
            redundant_directory = ''if len(directory_split) > 1:
                redundant_directory = '/' + '/'.join(directory_split[:-1]) + '/'  # 过滤上级目录名称

            # 遍历本地路径下所有的文件路径, 目录。
            local_files_path = []
            local_directory = []
            for root, dirs, files in os.walk(local_path):
                local_directory.append(root)
                for file in files:
                    if root[-1] != '/':
                        local_files_path.append(root + '/' + file)
                    elif root[-1] == '/':
                        local_files_path.append(root + file)

            # 在远程路径上创建对应的目录 for directory in local_directory:# 目录切割使用 replace 方法不是很严谨,镜像目录会出问题,留在这里抛个异常,后续再改。try:
                    sftp.mkdir(remote_path + directory.replace(redundant_directory, ''))
                except Exception as e:
                    raise ValueError(ip, e, remote_path + directory.replace(redundant_directory, ''))

            # 上传文件 for local_file in local_files_path:
                sftp.put(local_file, remote_path + local_file.replace(redundant_directory, ''))
                print('%s put: %s' % (ip, remote_path + local_file.replace(redundant_directory, '')))

        # 传文件 elif local_path_type == 'file':if remote_path_type == 'directory':
                file = local_path.split('/')[-1]
                sftp.put(local_path, remote_path + file)
                print('%s put: %s' % (ip, remote_path + file))
            elif remote_path_type == 'file':  # 远程路径为文件格式
                sftp.put(local_path, remote_path)
                print('%s put: %s' % (ip, remote_path))
        else:
            raise NotADirectoryError('请检查传入的本地路径:%s 和远程路径:%s 是否正确' % (local_path, remote_path))

        sftp.close()
        ssh.close()

    def sftp_get(self, ip, remote_path, local_path, user=None, passwd=None, auth='id_rsa'):
        client = self.ssh_client(ip, user, passwd, auth)
        sftp = client.open_sftp()

        # 检查本地路径是否存在,再检查是否为目录 if os.path.exists(local_path):if os.path.isdir(local_path):
                local_path_type = 'directory'else:
                raise ValueError('本地路径:%s 错误,请传入一个正确的目录路径' % local_path)
        else:
            raise NotADirectoryError('本地路径:%s 不存在,请传入一个正确的本地路径' % local_path)

        # 检查远程路径是否为文件或者目录 try:if stat.S_ISDIR(sftp.stat(remote_path).st_mode):
                remote_path_type = 'directory'else:
                remote_path_type = 'file'except:
            raise NotADirectoryError('远程路径:%s 不存在' % remote_path)

        # 如果远程路径和本地路径都是目录 if remote_path_type == 'directory' and local_path_type == 'directory':
            folders = []  # 提前创建一个列表,用来接收递归遍历出的目录路径。#  递归遍历远程路径中的所有目录
            self.check_remote_folders(sftp=sftp, remote_path=remote_path, remote_folders=folders)
            folders.append(remote_path)  # 别忘了一级目录

            # 遍历远程路径中的所有目录 for folder in folders:
                local_download_path = local_path + '/%s/' % ip + folder + '/'# 在本地新建文件夹
                os.makedirs(local_download_path, exist_ok=True)
                files = sftp.listdir_attr(folder)  # 获取远程目录内的文件信息# 遍历文件路径 for file in files:if stat.S_ISDIR(file.st_mode) is False:
                        file_path = file.filename
                        print('%s get:%s' % (ip, local_download_path + '/' + file_path))
                        sftp.get(folder + '/' + file_path, local_download_path + '/' + file_path)
        # 如果远程路径为文件,本地路径为目录 elif remote_path_type == 'file' and local_path_type == 'directory':
            remote_file = remote_path.split('/')[-1]
            local_download_path = local_path + '%s/' % ip
            os.makedirs(local_download_path, exist_ok=True)
            print('%s get: %s' % (ip, local_download_path + remote_file))
            sftp.get(remote_path, local_download_path + remote_file)

    def batch_sftp_get(self, ip, remote_path, local_path, user=None, passwd=None, auth='id_rsa'):
        pool = multiprocessing.Pool(5)
        for i in ip:
            pool.apply_async(self.sftp_get, (i, remote_path, local_path, user, passwd, auth,))
        pool.close()
        pool.join()

    def batch_sftp_put(self, ip, local_path, remote_path, user=None, passwd=None, auth='id_rsa'):
        pool = ThreadPoolExecutor(max_workers=10)
        task = [pool.submit(self.sftp_put, i, local_path, remote_path, user, passwd, auth) for i in ip]
        for future in as_completed(task):
            future.result()

    def check_remote_folders(self, sftp, remote_path, remote_folders: list):# 递归遍历远程路径中的目录,并添加到 remote_folders  for f in sftp.listdir_attr(remote_path):# 检查路径状态是否为目录 if stat.S_ISDIR(f.st_mode):# 递归调用自己
                self.check_remote_folders(sftp, remote_path + '/' + f.filename, remote_folders)
                #  添加遍历到的目录信息到列表中
                remote_folders.append(remote_path + '/' + f.filename)

    def get_info(self, path):
        self.logger.debug('接收参数 path:%s'.encode('utf8') % path.encode('utf8'))
        if os.path.exists(path):
            info_list = [i.replace('\n', '') for i in open(path, 'r', encoding='utf8').readlines()]
            return info_list
        else:
            self.logger.warning('%s 不存在,请传入一个正确的目录' % path)
            raise ValueError('%s 不存在,请传入一个正确的目录' % path)

    @staticmethoddef sn_random_generator():
        lower_letters = random.sample([chr(i + ord('a')) for i in range(26)], 7)
        num = random.sample([str(i) for i in range(10)], 5)
        passwd = lower_letters + num
        random.shuffle(passwd)
        return ''.join(passwd)

    @staticmethoddef log_analysis(filename):if os.path.exists(filename):try:
                data = json.load(open(filename, 'r', encoding='utf8'))
                return data
            except Exception as e:
                print('%s 无法解析该类型文件' % filename + ' ' + str(e))
                raise TypeError('%s 无法解析该类型文件' % filename + ' ' + str(e))
        else:
            raise ValueError('该%s 文件路径不存在,请传入一个正确的文件路径' % filename)

    def show_log(self, filename, mode=None):
        data: dict = self.log_analysis(filename)
        if isinstance(data, dict):
            for key, value in data["results"].items():
                if 'success' in value.keys():
                    if mode == 'success':
                        print(key)
                    elif mode is None:
                        print('%s:%s' % (key, value['success'].replace('\r\n', '')))
                elif 'failed' in value.keys():
                    if mode == 'failed':
                        print(key)
                    elif mode is None:
                        print('%s:%s' % (key, value['failed'].replace('\r\n', '')))

    @staticmethodasync def ping(ip):
        pro = await asyncio.create_subprocess_exec('ping', '-c', '5', '-w', '5', ip, stdout=asyncio.subprocess.PIPE)
        stdout = await pro.communicate()
        print(stdout[0].decode('utf8'))

    async def run(self, ip):
        task = [self.ping(i) for i in ip]
        await asyncio.wait(task)

    def start(self, ip):
        loop = asyncio.get_event_loop()
        start_time = time.time()
        loop.run_until_complete(self.run(ip))
        print('总共耗时: %.2f' % (time.time() - start_time))

    @staticmethoddef ping_analysis(filename):
        data = [i.replace('\n', '') for i in open(filename, 'r', encoding='utf8').readlines()]
        ip_list = []
        for i in data:
            if '64 bytes' in i:
                ip_data = re.search(r'[0-9]+\.[0-9]+\.[0-9]+\.[0-9]+', i)
                ip_list.append(ip_data.group())
        if len(ip_list) > 0:
            ip = list(set(ip_list))
            ip.sort()
            for j in ip:
                print(j)
            return ip
        else:
            print('所有 ip 均不通,或者传入的文件有误')
            return '所有 ip 均不通,或者传入的文件有误'


if __name__ == '__main__':
    taskManager = TaskManager()
    parser = argparse.ArgumentParser(description='TaskManager v1.3.9 build by hrj')
    parser.add_argument('path', help='文件路径')
    parser.add_argument('-a', action='store_true', help='多个 ip 执行不同的命令')
    parser.add_argument('-b', action='store', help='多个 ip 执行相同命令', metavar='[shell 命令]')
    parser.add_argument('-l', action='store_true', help='解析执行命令生成的日志')
    parser.add_argument('-ls', action='store_true', help='获取执行成功的 ip')
    parser.add_argument('-lf', action='store_true', help='获取执行失败的 ip')
    parser.add_argument('-p', action='store_true', help='异步 ping 多个 ip')
    parser.add_argument('-q', action='store_true', help='解析异步 ping 的能通的 ip')
    parser.add_argument('-put', nargs=2, help='多个 ip 批量 sftp 上传相同的文件', metavar=('[本地路径]', '[远程路径]'))
    parser.add_argument('-get', nargs=2, help='多个 ip 批量 sftp 下载相同的文件', metavar=('[远程路径]', '[本地路径]'))
    parser.add_argument('-auth', nargs=2, help='ssh/sftp 用户名、密码认证', metavar=('[用户名]', '[密码]'))

    args = parser.parse_args()
    taskManager.logger.debug('接受参数:%s'.encode('utf8') % str(vars(args)).encode('utf8'))

    # ssh
    username = None
    password = None
    auth_type = 'id_rsa'

    file_info = taskManager.get_info(args.path)

    # ssh 认证 if args.auth:
        username = args.auth[0]
        password = args.auth[1]
        auth_type = 'noAuth'

    if args.a:
        taskManager.batch_ssh(ip=file_info, cmd_type='many', user=username, passwd=password, auth=auth_type)
    elif args.b:
        taskManager.batch_ssh(ip=file_info, cmd=args.b, user=username, passwd=password, auth=auth_type)

    # sftpelif args.put:
        taskManager.batch_sftp_put(
            ip=file_info, local_path=args.put[0], remote_path=args.put[1],
            user=username, passwd=password, auth=auth_type)
    elif args.get:
        # sftp 多进程并发会出现代码瞬间执行完毕,不报错的情况,待解决。for ip in file_info:
            taskManager.sftp_get(
                ip=ip, remote_path=args.get[0], local_path=args.get[1], user=username, passwd=password,
                auth=auth_type
            )

    # 日志 elif args.l:
        taskManager.show_log(args.path)
    elif args.ls:
        taskManager.show_log(args.path, mode='success')
    elif args.lf:
        taskManager.show_log(args.path, mode='failed')

    # 异步 pingelif args.p:
        taskManager.start(file_info)
    elif args.q:
        taskManager.ping_analysis(args.path)
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • 33.
  • 34.
  • 35.
  • 36.
  • 37.
  • 38.
  • 39.
  • 40.
  • 41.
  • 42.
  • 43.
  • 44.
  • 45.
  • 46.
  • 47.
  • 48.
  • 49.
  • 50.
  • 51.
  • 52.
  • 53.
  • 54.
  • 55.
  • 56.
  • 57.
  • 58.
  • 59.
  • 60.
  • 61.
  • 62.
  • 63.
  • 64.
  • 65.
  • 66.
  • 67.
  • 68.
  • 69.
  • 70.
  • 71.
  • 72.
  • 73.
  • 74.
  • 75.
  • 76.
  • 77.
  • 78.
  • 79.
  • 80.
  • 81.
  • 82.
  • 83.
  • 84.
  • 85.
  • 86.
  • 87.
  • 88.
  • 89.
  • 90.
  • 91.
  • 92.
  • 93.
  • 94.
  • 95.
  • 96.
  • 97.
  • 98.
  • 99.
  • 100.
  • 101.
  • 102.
  • 103.
  • 104.
  • 105.
  • 106.
  • 107.
  • 108.
  • 109.
  • 110.
  • 111.
  • 112.
  • 113.
  • 114.
  • 115.
  • 116.
  • 117.
  • 118.
  • 119.
  • 120.
  • 121.
  • 122.
  • 123.
  • 124.
  • 125.
  • 126.
  • 127.
  • 128.
  • 129.
  • 130.
  • 131.
  • 132.
  • 133.
  • 134.
  • 135.
  • 136.
  • 137.
  • 138.
  • 139.
  • 140.
  • 141.
  • 142.
  • 143.
  • 144.
  • 145.
  • 146.
  • 147.
  • 148.
  • 149.
  • 150.
  • 151.
  • 152.
  • 153.
  • 154.
  • 155.
  • 156.
  • 157.
  • 158.
  • 159.
  • 160.
  • 161.
  • 162.
  • 163.
  • 164.
  • 165.
  • 166.
  • 167.
  • 168.
  • 169.
  • 170.
  • 171.
  • 172.
  • 173.
  • 174.
  • 175.
  • 176.
  • 177.
  • 178.
  • 179.
  • 180.
  • 181.
  • 182.
  • 183.
  • 184.
  • 185.
  • 186.
  • 187.
  • 188.
  • 189.
  • 190.
  • 191.
  • 192.
  • 193.
  • 194.
  • 195.
  • 196.
  • 197.
  • 198.
  • 199.
  • 200.
  • 201.
  • 202.
  • 203.
  • 204.
  • 205.
  • 206.
  • 207.
  • 208.
  • 209.
  • 210.
  • 211.
  • 212.
  • 213.
  • 214.
  • 215.
  • 216.
  • 217.
  • 218.
  • 219.
  • 220.
  • 221.
  • 222.
  • 223.
  • 224.
  • 225.
  • 226.
  • 227.
  • 228.
  • 229.
  • 230.
  • 231.
  • 232.
  • 233.
  • 234.
  • 235.
  • 236.
  • 237.
  • 238.
  • 239.
  • 240.
  • 241.
  • 242.
  • 243.
  • 244.
  • 245.
  • 246.
  • 247.
  • 248.
  • 249.
  • 250.
  • 251.
  • 252.
  • 253.
  • 254.
  • 255.
  • 256.
  • 257.
  • 258.
  • 259.
  • 260.
  • 261.
  • 262.
  • 263.
  • 264.
  • 265.
  • 266.
  • 267.
  • 268.
  • 269.
  • 270.
  • 271.
  • 272.
  • 273.
  • 274.
  • 275.
  • 276.
  • 277.
  • 278.
  • 279.
  • 280.
  • 281.
  • 282.
  • 283.
  • 284.
  • 285.
  • 286.
  • 287.
  • 288.
  • 289.
  • 290.
  • 291.
  • 292.
  • 293.
  • 294.
  • 295.
  • 296.
  • 297.
  • 298.
  • 299.
  • 300.
  • 301.
  • 302.
  • 303.
  • 304.
  • 305.
  • 306.
  • 307.
  • 308.
  • 309.
  • 310.
  • 311.
  • 312.
  • 313.
  • 314.
  • 315.
  • 316.
  • 317.
  • 318.
  • 319.
  • 320.
  • 321.
  • 322.
  • 323.
  • 324.
  • 325.
  • 326.
  • 327.
  • 328.
  • 329.
  • 330.
  • 331.
  • 332.
  • 333.
  • 334.
  • 335.
  • 336.
  • 337.
  • 338.
  • 339.
  • 340.
  • 341.
  • 342.
  • 343.
  • 344.
  • 345.
  • 346.
  • 347.
  • 348.
  • 349.
  • 350.
  • 351.
  • 352.
  • 353.
  • 354.
  • 355.
  • 356.
  • 357.
  • 358.
  • 359.
  • 360.

使用示例:

参数说明:

  1. path为IP地址文件路径时,使用-a , -b , -get , -put, -p。-auth仅支持配合前4个参数一起使用。
  2. path为日志文件(/tmp/TaskManager/log/task_defalut_xxxxxx.log)路径时,使用-l, -ls, -lf。
  3. 当使用-p时需要重定向到文件才能用-q解析文件得到ping通的ip。
  4. 批量执行命令时,命令行末尾会有一个Task_sn:xxxxxxxxx这个是日志文件的名称不重复的部分,便于去找日志(感觉有点蹩脚,如果有好的办法请告诉我)。

日常刷命令:

密码认证:

公钥认证:

获取日志:

可以配合 grep,awk 等命令精准过滤。

总结

个人认为 Python 在初中级运维工作中的性质更像是工具,以提升工作效率、减少管理成本为主。可以从当前繁琐的工作中解脱出来,去探索更有价值的事情。python 本质上并不会减少故障的产生,所以在不同的阶段合理利用自身掌握的知识解决当前最重要的痛点,千万不要本末倒置。