ICode9

精准搜索请尝试: 精确搜索
首页 > 编程语言> 文章详细

python自动化发布小程序

2019-07-30 10:52:09  阅读:232  来源: 互联网

标签:code return python 程序 ...... list 自动化 print massage


环境介绍:

       该程序主要是使用python中的paramiko模块进行远程服务器的连接与命令执行。

       主要运行系统为:linux系统

       测试运行环境为:CentOS7.6

       python版本为:3.6.8

      使用模块有:os   json   paramiko2.5.0

1、配置文件,主要记录配置登录信息,执行命令的文件。

#配置文件说明:
#该配置文件必须为json格式,程序中已经固定了基本的模块。
#login为登录信息
#module为模块信息,一个模块相当于一台服务器的登录信息,信息要与login中的模块信息一致。
#commands为远程服务器命令信息,命令为字符串以逗号隔开。
#localcmds为本地服务器命令信息,命令为字符串以逗号隔开。
#allowcmds为允许执行的rm命令,当本地和远程命令中用rm命令就会匹配这里。命令为字符串以逗号隔开。如果为空就代表不允许执行任何rm命令。

{
"login": 
     {
      "1000":
             {
             "user":"root",
             "pwd":"root123", 
             "host":"192.168.3.3",
             "port":22
             },
       "2000":
             {
             "user":"root",
             "pwd":"root123", 
             "host":"192.168.3.4",
             "port":22
              }
      },
"module":["1000","2000"],     
"commands":[
"ls /opt", "pwd", "ps aux | grep tomcat" ], "localcmds":[
"pwd", "ls /root" ], "allowcmds":[
"rm -rf /usr/local/tomcat/webapps/ROOT/*" ] }

 

2、show.py文件,主要为整个流程的控制。

#!/usr/bin/env python
import judge, logic

#配置文件地址
config_addr = "/python/config.conf"
def show(config_addr):
    print("*****************启动自动化发布程序*****************")
    print("----------------开始读取配置文件----------------")
    jd = judge.Judge()
    info_dict = jd.configIsfile(config_addr)
    jd.checkCode(info_dict)
    lgc = logic.Logic()
    config_dict = lgc.readConfig(config_addr)
    jd.checkCode(config_dict)
    config_value = config_dict.get("value")
    print("----------------开始读取登录信息----------------")
    loginInfo_dict = config_value[0]print("读取完成......验证中......")
    jd.checkCode(jd.checklogin(loginInfo_dict))
    print("----------------开始读取模块录信息----------------")
    moduleInfo_list = config_value[1]print("读取完成......验证中......")
    jd.checkCode(jd.checkmodule(moduleInfo_list))
    print("----------------开始读取允许执行的'rm'命令信息----------------")
    allowcmdsInfo_list = config_value[2]print("读取完成......验证中......")
    jd.checkCode(jd.checkallowcmdsInfo(allowcmdsInfo_list))
    print("----------------开始读取本地命令信息----------------")
    localcmdsInfo_list = config_value[3]print("读取完成......验证中......")
    jd.checkCode(jd.checklocalcmds(localcmdsInfo_list, allowcmdsInfo_list))
    print("----------------开始读取远程服务器命令信息----------------")
    commandsInfo_list = config_value[4]print("读取完成......验证中......")
    jd.checkCode(jd.checkcommands(commandsInfo_list, allowcmdsInfo_list))
    #判断模块与登录信息是否匹配
    print("----------------匹配登录信息模块----------------")
    jd.checkCode(jd.moduleInLogin(moduleInfo_list, loginInfo_dict))
    #测试远程连接
    print("----------------开始测试远程连接服务器----------------")print("远程连接测试中......")
    for module in moduleInfo_list:
        info_dict = lgc.ssh(loginInfo_dict, module, ["cat /etc/redhat-release", "uname -a"])
        print("模块:'"+module+"'测试连接中......")if info_dict.get("code") == 1:
            print(info_dict.get("massage"))
            exit()
        print("模块:'" + module + "'测试连接成功......")
    print("远程连接测试完成......")
    # 获取本地命令并执行print("----------------开始执行本地命令----------------")print("命令执行中......")
    jd.checkCode(lgc.executeLocalcmd(localcmdsInfo_list))
    #建立远程连接
    print("----------------开始执行远程服务器命令----------------")print("命令执行中......")
    for module in moduleInfo_list:
        print("模块:‘"+module+"’服务器命令执行中......")
        info_dict = lgc.ssh(loginInfo_dict, module, commandsInfo_list)
        for info in info_dict.get("value"):
            print(info)
            time.sleep(1)
        jd.checkCode(info_dict)
    print("*****************自动化发布程序执行结束*****************")
if __name__ == "__main__":
    show(config_addr)

 

3、judge.py文件,主要做程序的判断功能。

import os

class Judge(object):
    #判断文件是否存在
    def configIsfile(self, config_addr):
        if os.path.isfile(config_addr):
            return {"code": 0, "massage": "配置文件读取中......"}
        else:
            return {"code": 1, "massage": "配置文件  '"+config_addr+"'  不存在!"}

    #判断登录信息
    def checklogin(self, loginInfo_dict):
        if isinstance(loginInfo_dict, dict):
            modules = loginInfo_dict.keys()
            for mod in modules:
                if isinstance(loginInfo_dict.get(mod), dict):
                    pass
                else:
                    return {"code": 1, "massage": "登录信息格式错误!"}
            return {"code": 0, "massage": "登录信息验证完成......"}
        else:
            return {"code": 1, "massage": "文件中无‘login’键的登录信息或‘login’值格式错误!"}

    #判断模块信息
    def checkmodule(self,moduleInfo_list):
        if isinstance(moduleInfo_list, list):
            errorinfo = []
            for mod in moduleInfo_list:
                if mod.isalnum():
                    pass
                else:
                    errorinfo.append(mod)
            if errorinfo.__len__() != 0:
                info = "模块必须由字母或数字组成!以下模块有误:"
                for i in errorinfo:
                    info += str(i)+" "
                return {"code": 1, "massage": info}
            else:
                return {"code": 0, "massage": "模块信息验证完成......"}
        else:
            return {"code": 1, "massage": "模块信息错误!"}

    #判断允许执行的rm命令
    def checkallowcmdsInfo(self,allowcmds_list):
        if isinstance(allowcmds_list, list):
            errorinfo = []
            for cmd in allowcmds_list:
                if isinstance(cmd, str) and "rm" in cmd or cmd.strip() == "":
                    pass
                else:
                    errorinfo.append(cmd)
            if errorinfo.__len__() != 0:
                info = "允许执行的‘rm’命令必须为以'rm'开头的字符串!以下命令有误:"
                for i in errorinfo:
                    info += str(i) + " "
                return {"code": 1, "massage": info}
            else:
                return {"code": 0, "massage": "允许执行的‘rm’命令信息验证完成......"}
        else:
            return {"code": 1, "massage": "命令信息错误!"}

    #判断本地命令
    def checklocalcmds(self, lcoalcmdsInfo_list, allowcmds_list):
        if isinstance(lcoalcmdsInfo_list, list):
            errorinfo = []
            for cmd in lcoalcmdsInfo_list:
                if isinstance(cmd, str):
                    if cmd[0:2] == "rm":
                        execute = "drop"
                        for allow in allowcmds_list:
                            if cmd.strip() == allow:
                                execute = "allow"
                                break
                        if execute == "drop":
                            return {"code": 1, "massage": cmd + " 命令禁制执行!"}
                else:
                    errorinfo.append(cmd)
            if errorinfo.__len__() != 0:
                info = "命令必须为字符串!以下命令有误:"
                for i in errorinfo:
                    info += str(i) + " "
                return {"code": 1, "massage": info}
            else:
                return {"code": 0, "massage": "本地命令信息验证完成......"}
        else:
            return {"code": 1, "massage": "命令信息错误!"}

    #判断远程服务器命令
    def checkcommands(self, commandsInfo_list, allowcmds_list):
        if isinstance(commandsInfo_list, list):
            errorinfo = []
            for cmd in commandsInfo_list:
                if isinstance(cmd, str):
                    if cmd[0:2] == "rm":
                        execute = "drop"
                        for allow in allowcmds_list:
                            if cmd.strip() == allow:
                                execute = "allow"
                                break
                        if execute == "drop":
                            return {"code": 1, "massage": cmd + " 命令禁制执行!"}
                else:
                    errorinfo.append(cmd)
            if errorinfo.__len__() != 0:
                info = "命令必须为字符串!以下命令有误:"
                for i in errorinfo:
                    info += str(i) + " "
                return {"code": 1, "massage": info}
            else:
                return {"code": 0, "massage": "远程服务器命令信息验证完成......"}
        else:
            return {"code": 1, "massage": "远程服务器命令信息错误!"}

    #判断模块与登录信息中的模块是否一致
    def moduleInLogin(self, modules_list, login_dict):
        keys = login_dict.keys()
        for module in modules_list:
            if module not in keys:
                return {"code": 1, "massage": "模块:'%s'在登录信息中不存在!"}
        return {"code": 0, "massage": "模块与登录信息匹配完成......"}

    #打印返回信息,并判断是否出错退出程序
    def checkCode(self, dictInfo):
        print(dictInfo.get("massage"))if dictInfo.get("code") == 1:
            exit()

 

4、logic.py文件,主要做程序的业务的文件读取、远程连接和命令执行等功能。

import paramiko, os, json


class Logic(object):
    #读取配置文件
    def readConfig(self, config_addr):
        try:
            # 读取配置文件
            with open(config_addr) as files:
                file = json.load(files)
        except Exception as e:
            print(e)
            return {"code": 1, "massage": "文件内容不是json格式或json格式错误!..."}
        try:
            # 获取登录服务器信息
            loginInfo = file.get("login")
            # 获取模块信息
            moduleInfo = file.get("module")
            # 获取可执行rm命令信息
            allowcmdsInfo = file.get("allowcmds")
            # 获取本地命令信息
            localInfo = file.get("localcmds")
            # 获取远程服务器命令信息
            cmdInfo = file.get("commands")
        except Exception as e:
            file.close()
            return {"code": 1, "massage": "获取信息错误!..." + e}
        file.close()
        return {"code": 0, "massage": "文件读取完成......", "value": [loginInfo, moduleInfo, allowcmdsInfo, localInfo, cmdInfo]}

    #执行本地命令
    def executeLocalcmd(self, localcmds):
        for cmd in localcmds:
            try:
                os.system(cmd)
            except Exception as e:
                return {"code": 1, "massage": "本地命令执行出错!..."+e}
        return {"code": 0, "massage": "本地命令执行完成......"}

    #远程连接服务器并执行命令
    def ssh(self, loginInfo_dict, module, commandsInfo_list):
        logininfo = loginInfo_dict.get(module)
        host = logininfo.get("host")
        user = logininfo.get("user")
        pwd = logininfo.get("pwd")
        port = logininfo.get("port")
        sshclient = paramiko.SSHClient()
        sshclient.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        try:
            sshclient.connect(hostname=host, username=user, password=pwd, port=port)
        except Exception as e:
            return {"code": 1, "massage": "远程连接错误!"+e}
        info_list = []
        for cmd in commandsInfo_list:
            try:
                stdin, stout, sterr = sshclient.exec_command(cmd)
            except Exception as e:
                return {"code": 1, "massage": "远程服务器命令执行错误!" + e}
            info_list.append(stout.read().decode())
        sshclient.close()
        return {"code": 0, "massage": "远程服务器命令执行完成......", "value": info_list}

 

 5、执行程序

python show.py

 

标签:code,return,python,程序,......,list,自动化,print,massage
来源: https://www.cnblogs.com/NanZhiHan/p/11268298.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有