ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

RDS一键同步到新实例

2021-12-16 10:35:20  阅读:192  来源: 互联网

标签:RDS rds list req 一键 param add 实例 query


一.前情提要

因需求要将生产环境多台RDS给克隆到DEV环境,因为环境在调试,在生产变动时可能要重新同步一遍,这个调试周期要一个多月,期间主要是环境的部署配置。

阿里云RDS同区域同步使用DTS操作将很方便,配置DTS的迁移按照教程就能免费又快捷的迁移了。

但当RDS很多,又要不定时去迁移一下(同步要钱),就需要通过阿里云开放的api来进行批量操作了。

需要使用python3,安装阿里云核心库和rds库
pip3 install aliyun-python-sdk-core
pip3 install aliyun-python-sdk-rds
file

注意:
1.需要填写同步的信息,当完成前几个,后面一个失败,需要注释掉成功的实例信息
2.需要加上阿里云ak和sk
3.脚本会清空同步实例的库,不然也没法同步

二.脚本

#!/bin/python3
 
from aliyunsdkcore.client import AcsClient
from aliyunsdkcore.request import CommonRequest
import ast, sys, pymysql, time, json
 
#填写信息主库和从库
#实例名,实例ID,要迁移的库,账号名,密码
rds_info = [
    [("生产会员", "rm-2zeasdasdsasd", "prod_userdb", "dtscp", "123456"),("dev会员", "rm-2asdsadd", "dev_userdb", "dtscp", "123456")],
    [("生产支付", "rm-2zfasda", "prod_paydb", "dtscp", "1233456"),("dev支付", "rm-2asdasdasd", "dev_paydb", "dtscp", "123456")],
    ]


#自动变量
dts_id = ""
dts_list = []

#资源信息
aliyun_user_ak = 'Lwewwewew'
aliyun_user_sk = 'AEwqewqrwqeq'
region_id = 'cn-beijing'
client = AcsClient(ak=aliyun_user_ak, secret=aliyun_user_sk, region_id=region_id, timeout=300)


#创建DTS
def Create_Dts(req):
    global dts_id
    global dts_list
    req.set_action_name('CreateMigrationJob')
    req.add_query_param('RegionId', "cn-hangzhou")
    req.add_query_param('Region', "cn-beijing")
    req.add_query_param('MigrationJobClass', "large")
    req.add_query_param('ClientToken', "fj2j3kk2jasjd")
 
    response = client.do_action(req)
    false = False
    true = True
    response_dict = eval(response)
    dts_id = response_dict['MigrationJobId']
    dts_list.append(dts_id)
 
    if response_dict['Success'] == True:
        print("创建任务成功,ID为: " + dts_id)
    else:
        print("创建任务失败")
        sys.exit()
 
#验证是否是空的库,0为空,1为不空
def Check_Table(cur):
    sql = "show tables;"
    cur.execute(sql)
    table_number = cur.fetchall()
    if table_number:
        return 1
    else:
        return 0
 
def Create_Link(rds_list):
    #部分拼接
    rds_dest_ip = rds_list[1][1] + ".mysql.rds.aliyuncs.com" #目标实例IP
 
    db = pymysql.connect(host=rds_dest_ip, port=3306, user=rds_list[1][3], passwd=rds_list[1][4], db=rds_list[1][2], charset='utf8')
    cursor = db.cursor() #使用cursor方法创建一个游标
 
    print("开始清理数据库" + rds_list[1][2])
    if Check_Table(cursor) == 0:
        print(rds_list[1][2] + "库已经是空的,跳过清理步骤")
        cursor.close()
        db.close()
    else:
        #这里输出drop开头的删除表命令,存到元组里
        sql = "show tables ;"
        cursor.execute(sql)
        table_bin = cursor.fetchall()
        cursor.execute('SET foreign_key_checks = 0;')
 
        #循环执行drop删除表命令
        for i in table_bin:
            sql = "drop table" + '`' + i[0] + '`'
            cursor.execute(sql)
        cursor.execute('SET foreign_key_checks = 1;')
 
        #检测是否清空了
        if Check_Table(cursor) == 1:
            print(rds_list[1][2] + "库清空失败")
        else:
            print(rds_list[1][2] + "清空成功")
 
        #提交操作并关闭链接
        db.commit()
        cursor.close()
        db.close()
 
def Conf_Dts(req,rds_list):
    #部分拼接
    rds_job_name = rds_list[0][0] + "-》" + rds_list[1][0] #rds实例名
    rds_sour_ip = rds_list[0][1] + ".mysql.rds.aliyuncs.com" #源实例IP
    rds_dest_ip = rds_list[1][1] + ".mysql.rds.aliyuncs.com" #目标实例IP
    rds_sour_dbname = '\"%s\"' %(rds_list[0][2])
    rds_dest_dbname = '\"%s\"' %(rds_list[1][2])
    global dts_id
    print("开始进行" + rds_job_name + "的DTS任务配置")
 
    req.set_action_name('ConfigureMigrationJob')
    req.add_query_param('RegionId', "cn-hangzhou")
    req.add_query_param('MigrationJobName', rds_job_name)
    req.add_query_param('SourceEndpoint.InstanceType', "RDS")
    req.add_query_param('DestinationEndpoint.InstanceType', "RDS")
    req.add_query_param('MigrationMode.StructureIntialization', "true")
    req.add_query_param('MigrationMode.DataIntialization', "true")
    req.add_query_param('MigrationMode.DataSynchronization', "false")
    req.add_query_param('MigrationObject', "[{     \"DBName\": %s,     \"NewDBName\": %s }]" %(rds_sour_dbname, rds_dest_dbname))
    req.add_query_param('MigrationJobId', dts_id)
    req.add_query_param('SourceEndpoint.InstanceID', rds_list[0][1])
    req.add_query_param('SourceEndpoint.EngineName', "Mysql")
    req.add_query_param('SourceEndpoint.Region', "cn-beijing")
    req.add_query_param('SourceEndpoint.IP', rds_sour_ip)
    req.add_query_param('SourceEndpoint.Port', "3306")
    req.add_query_param('SourceEndpoint.UserName', rds_list[0][3])
    req.add_query_param('SourceEndpoint.Password', rds_list[0][4])
    req.add_query_param('DestinationEndpoint.InstanceID', rds_list[1][1])
    req.add_query_param('DestinationEndpoint.EngineName', "Mysql")
    req.add_query_param('DestinationEndpoint.Region', "cn-beijing")
    req.add_query_param('DestinationEndpoint.IP', rds_dest_ip)
    req.add_query_param('DestinationEndpoint.Port', "3306")
    req.add_query_param('DestinationEndpoint.UserName', rds_list[1][3])
    req.add_query_param('DestinationEndpoint.Password', rds_list[1][4])
 
    response = client.do_action(req)
    print(str(response, encoding = 'utf-8'))
 
def Check_Dts(req, dts_name):
    req.set_action_name('DescribeMigrationJobStatus')
    req.add_query_param('RegionId', "cn-hangzhou")
    req.add_query_param('MigrationJobId', dts_name)
    response = client.do_action(request)
    res_dict = json.loads(response)
 
    dts_stat = {'NotStarted':'未启动',
        'Prechecking':'预检查中',
        'PrecheckFailed':'预检查失败',
        'Migrating':'迁移中',
        'Suspending':'暂停中',
        'MigrationFailed':'迁移失败',
        'Finished':'完成'}
 
    print(dts_name + "当前状态:" + dts_stat[res_dict['MigrationJobStatus']])

#开始执行
for rds_list in rds_info:
    #初始化
    request = CommonRequest()
    request.set_accept_format('json')
    request.set_domain('dts.aliyuncs.com')
    request.set_method('POST')
    request.set_protocol_type('https') # https | http
    request.set_version('2020-01-01')

    Create_Dts(request) #创建dts任务
    Create_Link(rds_list) #清理数据库
    Conf_Dts(request, rds_list) #配置DTS信息
    print("")

#循环显示状态
while True:
    for i in dts_list:
        Check_Dts(request, i)

    print("")
    time.sleep(30)

标签:RDS,rds,list,req,一键,param,add,实例,query
来源: https://www.cnblogs.com/rxysg/p/15696673.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有