ICode9

精准搜索请尝试: 精确搜索
首页 > 数据库> 文章详细

hive导致表锁的SQL和租户信息获取

2021-11-05 19:35:08  阅读:141  来源: 互联网

标签:info SQL list hive sql query line 表锁 id


hive导致表锁的SQL和租户信息获取

多租户平台存在同一个表有多个租户具有访问权限,hive提供锁机制,保障数据的安全。由此会引发由于某个租户占用锁时间较长,其他租户作业滞后,如果将导致表锁的SQL和租户信息截取到,可以提供给局方进行业务流程的优化,加快作业执行效率。本文档记录一个可实现的方法供参考

获取表锁信息

场景:hive使用分布式协调服务(Zookeeper)提供的分布式锁,hive底层由mapreduce执行,yarn统一资源调度,本文档由python语言实现

关键代码

# xxxx
# 2021/11/4 8:54
import configparser
import os
import time
from kazoo.client import KazooClient
import ConnMysqlTmpl

'''
#获取zookeeper集群信息
ZookeeperInfo.ini文件为:
[zookeeper-conn]
url=zk_host_name:port
'''
def zookeeper_info():
   cf = configparser.ConfigParser()
   pass1 = os.path.dirname(os.path.abspath('.'))
   cf.read(pass1 + "/config/ZookeeperInfo.ini")
   items = cf.items("zookeeper-conn")
   list = []
   for line in items:
       list.append(line[1])
   return list

'''
#定义一个zookeeper类,实现zk集群的链接和关闭
'''
class Zookeeper:
   def connection(self,zk_url):
       self.zk_url=zk_url
       try:
           self.conn_path=KazooClient(self.zk_url)
           self.conn_path.start()
           return self.conn_path
       except Exception as e:
           print(f"SSH连接异常, 错误如下: {e}")

   def conn_close(self):
       self.conn_path.stop()
       print("已关闭Zookeeper连接...")

'''
#获取需要监控的表
HiveMonLockInfo文件格式随意,只要在zk中拿到需要监控的表路径即可
'''
def monitor_info():
   filename = 'datainfo/HiveMonLockInfo'
   if os.path.exists(filename):
       with open(filename, 'r', encoding='utf8') as rfile:
           lines = rfile.readlines()
       return lines
   else:
       print("未找到文件:【 {0} 】信息,请联系后台管理员处理".format(filename))

'''
#获取表锁列表
'''
def get_lock():
   zk=Zookeeper()
   zk_info = zookeeper_info()
   zk_conn=zk.connection(zk_info[0])
   table_info=monitor_info()
   locks = []
   for table in table_info:
       if zk_conn.exists("/hive_zookeeper_namespace/" + table.rstrip('\n')):
           lock_name = zk_conn.get_children("/hive_zookeeper_namespace/" + table.rstrip('\n'))
           for lock in lock_name:
               if "LOCK" in lock:
                   locks.append("/hive_zookeeper_namespace/"+table.rstrip('\n')+"/"+lock)
               else:
                   pass
       else:
           print("/hive_zookeeper_namespace/" + table.rstrip('\n'), "无锁情况", get_time())
   zk.conn_close()
   return locks

'''
#获取每个表锁的具体信息
table_locks_info为入库执行的语句
'''
def get_lock_info():
   zk = Zookeeper()
   zk_info = zookeeper_info()
   zk_conn = zk.connection(zk_info[0])
   all_locks = get_lock()
   print(all_locks)
   if all_locks:
       for path in all_locks:
           #获取所有锁node信息
           all_info=zk_conn.get(path)

           #获取query_id和sql语句
           all_sql_info = str(all_info).split("ZnodeStat")[0]
           query_id = (all_sql_info.split(":")[0])[3:]
           sql_info_tmp = all_sql_info.split(":")[3]
           sql_info_tmp1=sql_info_tmp.replace("\n","\\n")
           sql_info=sql_info_tmp1.replace("\'","\\'")

           #获取节点创建信息
           time_info = str(all_info).split("ZnodeStat")[1]
           create_time = time_info.split(",")[2]

           #时间戳转换
           timeStamp = int(create_time.split("=")[1])
           '''
          爬取下来的时间戳长度都是13位的数字,而time.localtime的参数要的长度是10位,所以我们需要将其/1000并取整即可
          '''
           timeArray = time.localtime(timeStamp/1000)
           otherStyleTime = time.strftime("%Y-%m-%d %H:%M:%S", timeArray)
           try:
               ConnMysqlTmpl.table_locks_info(query_id,sql_info,otherStyleTime,path.split("/")[3])
           except Exception as e:
               print(f"执行SQL失败,请排查:{e}")
   else:
       print("所监控的表无锁情况~", get_time())
   zk.conn_close()

def get_time():
   create_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
   return create_time

if __name__ == '__main__':
   get_lock_info()

主要说明

#获取所有锁node信息

#获取query_id和sql语句
all_sql_info = str(all_info).split("ZnodeStat")[0]
query_id = (all_sql_info.split(":")[0])[3:]
sql_info_tmp = all_sql_info.split(":")[3]
sql_info_tmp1=sql_info_tmp.replace("\n","\\n")
sql_info=sql_info_tmp1.replace("\'","\\'")

/SQL语句和SQL语句的query_id/

获取运行作业的query_id和租户

关键代码

import requests
import configparser
import os
import AppSourceInfo
import ConnMysqlTmpl
from json import JSONDecodeError
from requests.auth import HTTPBasicAuth

'''
#获取配置信息:访问url
'''
def get_app_info():
   cf=configparser.ConfigParser()
   pass1=os.path.dirname(os.path.abspath("."))
   cf.read(pass1 + "/config/AppExecSqlInfo.ini")
   items = cf.items("appexecsql-conn")
   list = []
   for line in items:
       list.append(line[1])
   return list

'''
#获取运行状态下作业的query_id和租户
'''
def job_info():
   conn_info=get_app_info()
   app_ids = AppSourceInfo.get_each_app1()
   if app_ids:
       for line in app_ids:
           try:
               all_info_tmp=requests.get(conn_info[0]+line+"/ws/v1/mapreduce/jobs/"+str(line).replace("application","job")+"/conf",
                             auth=HTTPBasicAuth(conn_info[1],conn_info[2]))
               try:
                   info_json=all_info_tmp.json()
                   result=info_json["conf"]
                   all_info=result["property"]
                   list=[]
                   for line in all_info:
                       '''
                      if line["name"] == "hive.query.string":
                          #list.append(str(line["value"]))
                          sql_info_tmp = line["value"]
                          if sql_info_tmp:
                              sql_info_tmp1 = sql_info_tmp.replace("\n", "\\n")
                              sql_info = sql_info_tmp1.replace("\'", "\\'")
                              list.append(sql_info)
                          else:
                              sql_info = "0"
                              list.append(sql_info)
                      elif line["name"] == "mapreduce.jdbc.input.query":
                          #list.append(str(line["value"]))
                          jdbc_query_tmp = line["value"]
                          #print(type(jdbc_query_tmp))
                          if len(jdbc_query_tmp):
                              jdbc_query_tmp1 = jdbc_query_tmp.replace("\n", "\\n")
                              jdbc_query = jdbc_query_tmp1.replace("\'", "\\'")
                              list.append(jdbc_query)
                          else:
                              jdbc_query = "0"
                              list.append(jdbc_query)
                      '''
                       
                       #获取query_id
                       if line["name"] == "hive.query.id":
                           #list.append(str(line["value"]))
                           query_id_tmp = line["value"]

                           if query_id_tmp:
                               query_id = line["value"]
                               list.append(query_id)
                           else:
                               query_id = "0"
                               list.append(query_id)
                               
                        #获取执行作业的队列名称,就能对应到租户
                       elif line["name"] == "mapreduce.job.queuename":
                           #list.append(str(line["value"]))
                           user_info = (line["source"])[1]
                           user = str(user_info).split("/")
                           queue_name = user[4]
                           list.append(queue_name)
                       else:
                           pass
                   print(list)
                   print(len(list))
                   try:
                       pass
                       ConnMysqlTmpl.user_app_query_info1(list)
                   except Exception as e:
                       print(f"数据入库失败,请排查:{e}")
               except JSONDecodeError as e:
                   print("{0}作业非MapReduce任务,请排查~".format(line))
           except Exception as e:
               print("获取app信息失败,请排查 {0}".format(e))
   else:
       print("获取app_id信息失败,请排查~")

主要说明

· 执行前(ACCEPTED)状态下的作业,内容编码存在异常,无法通过yarn rest api进行读取

· 执行完成的作业(FINISHED)的作业无异常情况下都会及时的释放锁,如果集群开始了日志服务,可以访问jobhistoryserver服务读取对应日志

获取运行状态下作业的信息,链接为:

/http://resouecemanage:port/applicationID/ws/v1/mapreduce/jobs/jobId/conf/

将返回的数据进行json格式转换:

info_json=all_info_tmp.json() result=info_json["conf"] all_info=result["property"]

获取query_id:

if line["name"] == "hive.query.id":
                           #list.append(str(line["value"]))
                           query_id_tmp = line["value"]

获取资源队列名称:

line["name"] == "mapreduce.job.queuename":
                           #list.append(str(line["value"]))
                           user_info = (line["source"])[1]
                           user = str(user_info).split("/")
                           queue_name = user[4]
                           list.append(queue_name)

效果呈现

SELECT
a.query_id,
a.queue_name,
b.sql_info,
b.table_name,
b.ctime
FROM
user_app_query_info a
JOIN zookeeper_lock_info b
WHERE
a.query_id = b.query_id

两个表根据query_id进行关联,获取到锁表情况下,锁表的SQL语句和租户信息。

标签:info,SQL,list,hive,sql,query,line,表锁,id
来源: https://www.cnblogs.com/zxyax132620/p/15514885.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有