ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

111

2022-05-25 18:03:48  阅读:145  来源: 互联网

标签:rows sheet list sheet1 key sheet2 111


import logging
from collections import namedtuple
from pathlib import Path
from typing import Dict, Iterable, Sequence, List

from openpyxl import Workbook, load_workbook
from openpyxl.styles import Font, Alignment, PatternFill, colors
from openpyxl.utils import get_column_letter

from lib import error

logger = logging.getLogger(__name__)
def init_log():
    log_dir = Path('./log')
    if not log_dir.exists():
        log_dir.mkdir(parents=True)
    logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s %(name)s: %(message)s', handlers=[
        logging.StreamHandler(),
        logging.FileHandler(filename=log_dir/'excel.log', encoding='utf8')
    ])

class Field:
    def __init__(self, name, required=True, action=None) -> None:
        self.name = name
        self.required = required
        self.action = action


class RowTuple(tuple):
    """可以通过Field名字作为key访问的表格行Row
    """
    def __new__(cls, fields: Sequence, values: Sequence):
        # print(fields)
        # print(values)
        row = super().__new__(cls, values)
        row.fields = tuple(fields)
        row._map = None
        return row # 一行数据的元祖

    def __getitem__(self, field): # 在对象点语法获取它没有的属性和方法的时候自动触发
        if isinstance(field, int):
            return super().__getitem__(field)
        if not self._map:
            self._map = dict(zip(self.fields, self))
        # print(36, self._map)
        return self._map[field]

    def get(self, key, default=None):
        # print(40,self._map,key)
        value = self._map.get(key, default)
        return value if value else default


# def load_excel(excel_file: str, ws_names: dict):
#     wb = load_workbook(excel_file, data_only=True)
#     values = {}
#     for ws_name in ws_names:
#         try:
#             extract_fields = ws_names[ws_name]
#         except KeyError:
#             raise error.Error(f'"{ws_name}"表格不存在。')
#         rows = wb[ws_name].values
#         fields = next(rows)
#         for name in extract_fields:
#             if name not in fields:
#                 raise error.Error(f'表格"{ws_name}"中的"{name}"项不存在。')
#         data = []
#         for row in rows:
#             if not any(row):
#                 continue
#             rowdata = dict(zip(fields, row))
#             data.append({key: rowdata[key] for key in extract_fields})
#         values[ws_name] = data
#     return values

def reset_col(file_path):
    """
    1.调整各个sheet的各列宽,最大长度不超过默认宽度,不做调整防止列宽过窄。超过默认宽度则:列宽为最大长度*1.2,且最大为60
    2.对单元格设置样式,比如:
        1)超链接:1.设置蓝色斜体,加下划线 2.其长度不参与列宽设置
        2)表头居中,其他左对齐
    Args:
        file_path: 文件路径
    """
    wb = load_workbook(file_path)
    for sheet in wb.sheetnames:
        # 获取某个sheet对象
        ws = wb[sheet]

        # 遍历各列根据最长的一个cell的长度设置一列的长度
        for col in ws.columns:
            # col = (<Cell 'Sheet'.A1>, <Cell 'Sheet'.A2>, <Cell 'Sheet'.A3>)
            # print(col,ws.columns)
            index = list(ws.columns).index(col)  # 列序号 1、2、...
            letter = get_column_letter(index + 1)  # 列字母 A、B、C、...
            original_width = ws.column_dimensions[letter].width # 获取默认列宽
            max_length = 0
            # 获取这一列长度的最大值 当然也可以用: min获取最小值 mean获取平均值
            for index,cell in enumerate(col):
                try:
                    # # 第一步:表头水平居中,其他左对齐,所有单元格垂直居中自动换行
                    # if index == 0:
                    #     cell.alignment = Alignment(horizontal='center', vertical='center',wrapText= True)
                    # else:
                    #     cell.alignment = Alignment(horizontal="left", vertical='center',wrapText= True)

                    # 第一步:左对齐,所有单元格垂直居中自动换行
                    cell.alignment = Alignment(horizontal="left", vertical='center', wrapText=True)

                    # 第二步:如果是文件图片的超链接:1.设置蓝色字体,加下划线 2.其长度不参与列宽设置
                    if str(cell.value).startswith('=HYPERLINK'):
                        # cell.style = "Hyperlink" # 会变成蓝色字体但是没有下划线
                        cell.font = Font(underline='single',italic=True, color='0066FF') # 指定颜色带下划线
                        continue
                    # 第三步:查找列最长value的长度
                    if len(str(cell.value)) > max_length:
                        max_length = len(str(cell.value))
                except BaseException as e:
                    print(e)
                    pass
            preset_width = max_length* 1.2 if max_length* 1.2 < 60 else 60
            # 最大长度不超过默认宽度,不做调整防止列宽过窄。超过默认宽度则:列宽为最大长度*1.2,且最大为40
            adjusted_width = preset_width if max_length > original_width else  original_width
            ws.column_dimensions[letter].width = adjusted_width
    wb.save(file_path)
    return True

def save_excel(file_path, headers_iter: Iterable, rows_iter: Iterable):
    """
    生成空白表格,并在默认的sheet中写入数据
    Args:
        excel_file:文件路径,如:C:\\Users\\13154\\Desktop\\666\\094.xlsx
        field_names:headers的Iterable,如(1,2,3)
        rows:行的Iterable,为空则传None,如 [(11,None,13),(21,'',23)],如果要插入文件链接,单元格的字符按照以下的格式:
            1)绝对路径 换电脑后失效,不靠谱
                '=HYPERLINK("{}","{}")'.format(r'file:///C:\222.png','莫名弹窗.jpg')
            2)相对路径,这个靠谱
                '=HYPERLINK("{}","{}")'.format(r'./\222.png','莫名弹窗.jpg') #表示该excel并列的一张图片
                '=HYPERLINK("{}","{}")'.format(r'./files\222.png','莫名弹窗.jpg') # 表示该excel并列的files文件夹中的一张图片
    """
    # 创建一个工作簿
    wb = Workbook()
    # 获取当前表单对象,即默认生成的sheet
    ws = wb.active
    # 生成表头
    ws.append(headers_iter)
    # 生成各行数据
    for row in rows_iter:
        ws.append(row)
    excelpath = Path(file_path)
    excelpath.resolve() # 绝对化路径,并将路径转换为合适的格式,例如windows下将斜杠转化为反斜杠
    wb.save(excelpath)

    reset_col(file_path)
    return True

def save_new_sheet(excel_file, sheets_dic):
    """打开excel新增sheet
    :param excel_file:
    :param sheetname:
    :param data:
    :return:
    """
    # logger.info(f'save_new_sheet: sheets_dic:{sheets_dic}')
    # 打开文件
    workbook = load_workbook(excel_file, data_only=True)
    mark_flag_lists = sheets_dic.pop('mark_flag_lists')
    # 遍历数据,生成多个sheet
    for sheetname, data in sheets_dic.items():
        # 删除同名的工作表sheet
        if sheetname in workbook.sheetnames:
            workbook.remove(workbook[sheetname])
        # 创建新的sheet
        ws = workbook.create_sheet(title=sheetname)
        # # 生成表头
        # ws.append(data[0])
        # 生成各行数据
        for row_index,row in enumerate(data):
            ws.append(row)

            # 对于Miss_math这个sheet,错误的地方需要红色标记出来
            if sheetname=='Miss_math' and row_index!=0:
                for column_index in range(len(row)):
                    if mark_flag_lists[row_index-1][column_index] == 1:
                        cell_a = ws.cell(row_index+1, column_index+1) # excel行数 1-n
                        cell_a.fill = PatternFill("solid", fgColor="FF6633")
                        cell_a.font = Font(color=colors.BLACK, bold=True)

    excelpath = Path(excel_file)
    excelpath.resolve()  # 绝对化路径,并将路径转换为合适的格式,例如windows下将斜杠转化为反斜杠
    workbook.save(excelpath)
    reset_col(excel_file)

def load_excel(excel_file: str, primary_key_lis: list, sheet_names: Iterable = None) -> List[Dict[str, Iterable] or List[str, Iterable]]:
    """加载excel文件中指定sheet中的数据
    每个sheet里第一行必须是表格的header。
    Args:
        excel_file: excel文件的路径
        primary_key_lis: 二元列表,列表值为列的索引0-n,每一行这两个列的数据会拼接成一个primary_key用来标记每一行的数据
        sheet_names: 要读取的sheet名称。默认是None,表示全部读取

    Returns:
        返回一个[dict, list]
        dict: key是sheet的名字,value1也是一个dict,value2是namedtuple的Iterable,。如{'Sheet1': {'key':(2.1, 2.2), 'key2':(3.1, 3.2), 'key3':(4.1, 4.2)},}
        list: 列表项是Iterable,primary_key有重复的行只记录一次,其他的 sheet_name+每行数据 放进这里面
        rows_num_list: 两个sheet的数据行数,如[100, 99]
    """
    logger.info(f'load_excel: primary_key_lis:{primary_key_lis}, sheet_names:{sheet_names}')
    workbook = load_workbook(excel_file, data_only=True)
    if not sheet_names:
        sheet_names = workbook.sheetnames
    result = {}
    duplicate_rows_list = []
    rows_num_list = []
    for ws_name in sheet_names:
        logger.info(f'load_excel: start load {ws_name}')
        if ws_name not in workbook.sheetnames:
            logger.exception(f'表格"{ws_name}"不存在。')
            raise error.Error(f'表格"{ws_name}"不存在。')
        # 选取一个sheet表单
        sheet = workbook[ws_name]
        # 获取第一行表头
        header = [c.value for c in sheet[1]]
        # result[ws_name] = [
        #     RowTuple(header, values)
        #     for values in sheet.iter_rows(min_row=2, values_only=True)
        # ]

        rows_dic = {}
        rows = 0
        for values in sheet.iter_rows(min_row=2, values_only=True):
            rows += 1

            # 某两列的数据拼接成唯一primary_key作为对照的id
            rows_dic_key = f'{str(values[primary_key_lis[0]])}_{str(values[primary_key_lis[1]])}'
            # print(rows_dic_key)

            # 出现相同数据则数据有异常 或 primary_key选取不当
            if rows_dic_key in rows_dic.keys():
                logger.warning(f'find duplicate_rows :{rows_dic_key} value:{values}')
                lis = list(values)
                lis.insert(0, ws_name)
                duplicate_rows_list.append(lis)
                continue

            rows_dic[rows_dic_key] = RowTuple(header, values)

        rows_num_list.append(rows)

        # 额外添加表头字段进去,便于生成表头
        rows_dic['headers_lis'] = header
        result[ws_name] = rows_dic

    return [result, duplicate_rows_list, rows_num_list]

def compare_excels(excel_file: str, primary_key_lis: list, sheet1: str, sheet2: str, ignore_columns_list: Sequence=() ):
    """根据两个列组合primary_key,对两个excel表中两个sheet数据一行行的对比, 找出不同
    :param excel_file: 文件路径
    :param primary_key_lis: 组合primary_key的两个列的索引,如[0, 1]会将第一列和第二列拼接为primary_key
    :param sheet1 & sheet2: 要对比的两个工作表sheet名子
    :param ignore_columns_list: 要忽略的列的索引,0-n
    :return: 会在该excel中生成sheet1_only、sheet2_only、mismatch、duplicate 4个sheet
    """
    # print(f'compare_excels: primary_key_lis:{primary_key_lis}, sheet1:{sheet1}, sheet2:{sheet2}, ignore_columns_list:{ignore_columns_list}')
    logger.info(f'compare_excels: primary_key_lis:{primary_key_lis}, sheet1:{sheet1}, sheet2:{sheet2}, ignore_columns_list:{ignore_columns_list}')
    res_list = load_excel(excel_file= excel_file, primary_key_lis=primary_key_lis, sheet_names=[sheet1, sheet2])
    sheet1_dic = res_list[0][sheet1]
    sheet2_dic = res_list[0][sheet2]
    duplicate_rows_list = res_list[1]
    sheet1_num, sheet2_num = res_list[2]


    sheet1_only_list = [sheet1_dic['headers_lis'],]
    sheet2_only_list = [sheet2_dic['headers_lis'],]

    lis = ['source', ]
    lis.extend(sheet1_dic['headers_lis'])
    miss_math_list = [lis,]

    # 找出两个表不共同拥有的行, 一定是对不上的,分别存放
    sheet1_or_sheet2_ids = set(sheet1_dic.keys()) ^ set(sheet2_dic.keys())
    for id in sheet1_or_sheet2_ids:
        if id in set(sheet1_dic.keys()):
            sheet1_only_list.append(sheet1_dic.get(id))
        else:
            sheet2_only_list.append(sheet2_dic.get(id))

    # 找出共有的行,但数据不相同的
    sheet1_and_sheet2_ids = set(sheet1_dic.keys()) & set(sheet2_dic.keys())
    match_num = 0
    miss_math_num = 0
    mark_flag_lists = [] # 不匹配的数据标记出来
    # 第一步:遍历行
    for id in sheet1_and_sheet2_ids:
        row_sheet1 = sheet1_dic[id]
        row_sheet2 = sheet2_dic[id]
        mark_flag_list = []
        # 第二步:遍历每一行所有列
        for i in range(len(row_sheet1)):
            if i in ignore_columns_list:
                # logger.warning(f'这是第{i}列数据,可以忽略')
                mark_flag_list.append(0)
                continue
            if row_sheet1[i] != row_sheet2[i]:
                mark_flag_list.append(1)
                miss_math_num += 1
            else:
                mark_flag_list.append(0)

        # 只要一个标志位是1,则mismath
        if 1 in mark_flag_list:
            lis1 = [sheet1, ]
            lis1.extend(list(row_sheet1))
            miss_math_list.append(lis1)

            lis2 = [sheet2, ]
            lis2.extend(list(row_sheet2))
            miss_math_list.append(lis2)

            # 第一列source不需要标记
            mark_flag_list.insert(0, 0)
            # 两个表的不同,需要添加两行
            mark_flag_lists.append(mark_flag_list)
            mark_flag_lists.append(mark_flag_list)
        else:
            match_num += 1

    sheets_dict= {
        f'{sheet1}_only': sheet1_only_list,
        f'{sheet2}_only': sheet2_only_list,
        f'Miss_math': miss_math_list,
        f'Duplicate': duplicate_rows_list,
        'mark_flag_lists': mark_flag_lists, # [[0,0,1], [1,0,0], ...]
                 }

    save_new_sheet(excel_file, sheets_dic=sheets_dict)

    match_num -= 1 # 表头
    duplicate_rows = len(duplicate_rows_list)
    sheet1_only = len(sheet1_only_list) - 1
    sheet2_only = len(sheet2_only_list) - 1
    logger.info(f'{sheet1}:{sheet1_num}, {sheet2}:{sheet2_num}, match_num:{match_num}, miss_math_num:{miss_math_num},duplicate_rows:{duplicate_rows}, {sheet1}_only:{sheet1_only}, {sheet2}_only:{sheet2_only}')

 

标签:rows,sheet,list,sheet1,key,sheet2,111
来源: https://www.cnblogs.com/xp1315458571/p/16310151.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有