ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

【实战项目】API数据传输

2020-09-16 15:32:25  阅读:11  来源: 互联网

标签:实战 log self sql API WLN time 数据传输 config


一. 项目简述

  向xxxxAPI定期传输指定数据。

 

二. 项目设计

  1. 基础功能:

  【功能 1】 提取数据

  描述 :连接数据库,执行SQL语句,读取所需数据,并将其另存为 .xlsx 文件。

  【功能 2】 登陆API

  描述 :录入基础参数,生成签名,上传指定参数,完成接口鉴权。

  【功能 3】 获取上传路径

  描述 :解析接口鉴权的返回参数, 加入上传文件名, 获取临时上传地址。

  【功能 4】上传文件

  描述 : 向临时上传地址上传文件,并获取该文件上传后的临时下载地址。

  2. 优化功能:

  【功能 1】 设定配置文件

  描述:为了减少代码修改,将配置信息存放于 .ini 文件,需要时读取。

  【功能 2】 设定日志

  描述:为了跟踪程序运行情况,在异常出现时能及时人工介入,针对主要步骤抛出异常,存储入日志,并将日志实时发送至手机端。

 

三. 类与功能模块

  0. 第三方库与全局变量

  描述:创建日志初始信息

  全局变量:日志字典,数据的所属日期,日志计数器

  第三方库及函数解析:

  (1)datetime

    datetime.datetime.now() 获得当前时间,返回值为 datetime.datetime

    datetime.timedelta() 返回时间增量,返回值为 datetime.timedelta

  (2)time

    time.time() 当前时间的时间戳(1970纪元后经过的浮点秒数),返回值为 float

    time.localtime() 格式化时间戳为本地时间,返回值为 time.struct_time

    time.strftime()  接收 time.struct_time型值,可自定义格式化,返回值为 str

import time
import struct
import hashlib
import requests
import random
import json
import pymssql
import pandas as pd
from configobj import ConfigObj
import datetime


log = {}
date = (datetime.datetime.now() + datetime.timedelta(days = -1)).strftime("%Y-%m-%d")
counter = 0
log["DDATE"] = date
log["STIME"] = time.strftime("%Y-%m-%d %H:%M:%S",time.localtime(time.time()))

  

  1. 文件类

  描述:用于从数据库中提取数据,并生成文件

  类的实例变量:数据库域名,用户名,密码,指定数据库名,指定表名,格式化时间(用于生成文件名)

  功能模块:connect()

  第三方库及函数解析:

  (1)configobj 

    ConfigObj(filename, encoding='UTF8')  解析配置文件 .ini

  (2)pymssql

    conn = pymssql.connect(host, username, password, database) 连接数据库

    cur = conn.cursor() 创建游标

    cur.close() 关闭游标

    cur.execute(sql) 执行SQL语句

    conn.commit() 提交数据,若SQL语句为增删改时,需要提交数据使其生效

    result = cur.fetchall() 获取全部数据

  (3)pandas

    writer = pd.ExcelWriter(fname) 打开指定excel文件,若不存在则新建

    df = pd.DataFrame(result, index=None, columns=[]) 将数据整理成指定格式的数据框

    df.to_excel(writer, index=False, sheet_name='...') 将数据框写入excel文件的指定表中

    writer.save() 保存excel文件

class File(object):
def __init__(self): self.domain = '' self.username = '' self.password = '' self.database = '' self.table = '' self.now = time.strftime("%Y%m%d_%H",time.localtime(time.time())) def connect(self): # 读取配置信息 config = ConfigObj('config.ini', encoding='UTF8') self.domain = config['sql_server']['domain'] self.username = config['sql_server']['username'] self.password = config['sql_server']['password'] self.database = config['sql_server']['database'] self.table = config['sql_server']['table'] try: conn = pymssql.connect(self.domain, self.username, self.password, self.database) log["STEP"] = "6-1 成功" except pymssql.OperationalError: log["STEP"] = "6-1 数据库连接失败" # 设定日志计数器 cur1 = conn.cursor() count_sql = ''' SELECT COUNT(*) FROM {table} WHERE DDATE = '{values}' '''.format(table=self.table, values=date) cur1.execute(count_sql) global counter counter = cur1.fetchall()[0][0] + 1 log["COUNTER"] = counter cur1.close() # 填写日志 cur2 = conn.cursor() keys = ', '.join(log.keys()) values = ', '.join(['%s']*len(log)) sql = 'INSERT INTO {table}({keys}) VALUES ({values})'.format(table=self.table, keys=keys, values=values) cur2.execute(sql, tuple(log.values())) conn.commit() cur2.close() # 提取第一份数据 if log["STEP"] == "6-1 成功": cur3 = conn.cursor() sql = ''' SELECT DDATE , ORGNAME , SHPID , SHPNO , SHPNAME , DEC_ODATE , ADDR , NUM_PSN , AREA , SLAMT , GST_QTY , Uprice , AmtPA , AmtPP , SLAMT_SVS , GST_QTY_SVS , Uprice_SVS , PCT_PLU , PCT_TBC , AVG_SLAMT , AVG_GST_QTY , AVG_Uprice , AVG_AmtPA , AVG_AmtPP , WLN_DAY1 , WLN_DAY2 , WLN_DAY3 , WLN_DAY4 , WLN_DAY5 , WLN_DAY6 , WLN_DAY7 FROM OUT_Tencent_N1_SALEDAYSHP WITH(NOLOCK) WHERE DDATE = DATEADD(DAY, -1, CONVERT(VARCHAR(100), GETDATE(), 23)) ''' cur3.execute(sql) result1 = cur3.fetchall() if result1 != []: log["STEP"] = "6-2 成功" # 数据写入文件 fname = "日销售数据_" + self.now + ".xlsx" writer = pd.ExcelWriter(fname) df1 = pd.DataFrame(result1, index=None, columns=["DDATE", "ORGNAME", "SHPID", "SHPNO", "SHPNAME", "DEC_ODATE", "ADDR", "NUM_PSN", "AREA", "SLAMT", "GST_QTY", "Uprice", "AmtPA", "AmtPP", "SLAMT_SVS", "GST_QTY_SVS", "Uprice_SVS", "PCT_PLU", "PCT_TBC", "AVG_SLAMT", "AVG_GST_QTY", "AVG_Uprice", "AVG_AmtPA", "AVG_AmtPP", "WLN_DAY1", "WLN_DAY2", "WLN_DAY3", "WLN_DAY4", "WLN_DAY5", "WLN_DAY6", "WLN_DAY7"]) df1.to_excel(writer, index=False, sheet_name='门店日销售表') cur3.close() else: log["STEP"] = "6-2 第一次提取数据为空" if log["STEP"] == "6-2 成功": # 提取第二份数据 cur4 = conn.cursor() sql = ''' SELECT DDATE , ORGNAME , CLSID , CLSNO , CLSNAME , SLAMT , VALUE , UNIT , AVG_SLAMT , AVG_VALUE , WLN_WEEK1 , WLN_WEEK2 , WLN_WEEK3 , WLN_WEEK4 , WLN_WEEK5 , WLN_WEEK6 FROM OUT_Tencent_N2_SALEDAYCLS WITH(NOLOCK) WHERE DDATE = DATEADD(DAY, -1, CONVERT(VARCHAR(100), GETDATE(), 23)) ''' cur4.execute(sql) result2 = cur4.fetchall() if result1 != []: log["STEP"] = "6-3 成功" df2 = pd.DataFrame(result2, index=None, columns=["DDATE", "ORGNAME", "CLSID", "CLSNO", "CLSNAME", "SLAMT", "VALUE", "UNIT", "AVG_SLAMT", "AVG_VALUE", "WLN_WEEK1", "WLN_WEEK2", "WLN_WEEK3", "WLN_WEEK4", "WLN_WEEK5", "WLN_WEEK6"]) df2.to_excel(writer, index=False, sheet_name='分类日销售表') cur4.close() conn.close() writer.save() else: log["STEP"] = "6-3 第二次提取数据为空"

 

  2. 数据类

  描述:用于将数据文件上传至腾讯微瓴API

  类的实例变量:应用ID,应用密钥,应用票据,域名,文件名,时间戳,随机数,动态密钥,物联票据,当前时间

  功能模块:generateSignature(),importFiles(),uploadFiles()

  第三方库及函数解析:

  (1)requests

    requests.get(url, params) 请求目标网站,“查询”信息

    requests.put(url, data) 请求目标网站,“增添” / “上传”信息

  (2)json

    json.loads() 将json格式数据转化为字典

  (3)struct

    struct.pack() 按照给定格式封装字符串

  (4)hashlib

    hashlib.sha1().hexdigest() 将指定字符串进行SHA1加密,并返回十六进制摘要

class Data(object):
    def __init__(self):
        self.appid = ''
        self.appKey = ''
        self.app_ticket = ''
        self.domain = ''
        self.file_name = ''
        self.timeStamp = str(int(round(time.time() * 1000)))
        self.randomNum = ''.join(str(random.choice(range(16))) for _ in range(10))     
        self.token = ''
        self.iotim_ticket = ''
        self.now = time.strftime("%Y%m%d_%H",time.localtime(time.time())) 


    def generateSignature(self):
        signature_str = struct.pack('=12sqq',self.appKey,int(self.timeStamp),int(self.randomNum))
        signature = hashlib.sha1(signature_str).hexdigest()
        return signature


    def importFiles(self):
        with open(self.file_name, "rb") as f:
            data = f.read()
        return data


    def uploadFiles(self):
        # 读取配置信息:应用标识、应用密码、应用票据、API接口域名、文件名
        # config  = ConfigObj('E:\业务建模\API接口\config.ini', encoding='UTF8')
        config  = ConfigObj('config.ini', encoding='UTF8')
        self.appid = config['API']['appid']
        self.appKey = bytes(config['API']['appKey'], encoding="utf8")
        self.app_ticket = config['API']['app_ticket']
        self.domain = config['API']['domain']
        self.file_name = config['API']['file_name'] + self.now + ".xlsx"
        
        # 接口鉴权
        params = {'time': self.timeStamp,
                  'num': self.randomNum,
                  'sig': self.generateSignature(),
                  'appid': self.appid,
                  'app_ticket': self.app_ticket}
        if log["STEP"] == "6-3 成功":
            try:
                res = requests.get(url=self.domain + "/common/ticket/loginByApp", params=params)
                # print("接口鉴权信息:" + res.content)
                msg = json.loads(res.content)
                self.token = msg[u'data'][u"token"]
                self.iotim_ticket = msg[u"data"][u"iotim_ticket"]
                log["STEP"] = "6-4 成功" 
            except:
                log["STEP"] = "6-4 接口鉴权失败" 

        # 获取临时上传地址
        params1 = {"token": self.token,
                   "iotim_ticket": self.iotim_ticket,
                   "file_name": self.file_name}
        if log["STEP"] == "6-4 成功":
            try:      
                response1 = requests.get(url=self.domain + "/common/fileservice/uploadTemporaryFile", params=params1)
                # print("临时上传:" + response1.content.decode("utf8"))
                content = json.loads(response1.content)
                upload_url = content[u'data'][u'upload_url']
                file_id = content[u'data'][u'file_id']
                log["STEP"] = "6-5 成功" 
            except:
                log["STEP"] = "6-5 上传地址获取失败"

        # 上传文件并获取临时下载地址
        params2 = {"token": self.token,
                   "iotim_ticket": self.iotim_ticket,
                   "file_id": file_id}
        if log["STEP"] == "6-5 成功":
            try:
                upload_imagdata = requests.put(url=upload_url, data=self.importFiles())
                response2 = requests.get(url=self.domain + "/common/fileservice/downloadTemporaryFile", params=params2)
                # print("临时下载::" + response2.content.decode("utf8"))
                log["STEP"] = "6-6 成功"
            except:
                log["STEP"] = "6-6 文件上传失败"

 

  3. 日志类

  描述:用于记录程序运行日志

  类的实例变量:域名,用户名,密码,数据库名,表名

  功能模块:connect()

 

class Log(object):
    def __init__(self):
        self.domain = ''
        self.username = ''
        self.password = ''
        self.database = ''
        self.table = ''


    def connect(self):
        # 读取配置信息:数据库服务器、用户名、密码、指定数据库
        # config  = ConfigObj('E:\业务建模\API接口\config.ini', encoding='UTF8')
        config  = ConfigObj('config.ini', encoding='UTF8')
        self.domain = config['sql_server']['domain']
        self.username = config['sql_server']['username']
        self.password = config['sql_server']['password']
        self.database = config['sql_server']['database']
        self.table = config['sql_server']['table']

        conn = pymssql.connect(self.domain, self.username, self.password, self.database)
        cur = conn.cursor()
        log["ETIME"] = time.strftime("%Y-%m-%d %H:%M:%S",time.localtime(time.time()))
        keys = ', '.join(log.keys())
        values = ', '.join(['%s']*len(log))
        sql = '''
        UPDATE {table} SET
        '''.format(table=self.table)
        update = ','.join([" {key} = %s".format(key=key) for key in log])
        sql += update
        sql_2 = '''
        WHERE DDATE = '{date}' AND COUNTER = '{counter}'
        '''.format(date=date, counter=counter)
        sql += sql_2
        cur.execute(sql, tuple(log.values())*2)
        conn.commit()
        cur.close()
        conn.close()

  

  4. 执行

  描述:用于运行程序

if __name__ == '__main__':
    files = File()
    files.connect()

    if log["STEP"] == "6-3 成功":
        data = Data()
        data.uploadFiles()

    record = Log()
    record.connect()

 

四. 项目难点

  难点 1 :windows10开发环境下的.py文件部署至windows2012服务器,服务器为新,不便操作环境配置

  解决方法:将.py文件封装为.exe可执行程序,定时运行

    (1)安装 pyinstaller,终端写入

pip install pyinstaller

    (2)将.py文件封装,终端写入

pyinstaller -F XXX.py

    (3)解决封装过程中的Python递归深度异常,打开.spec文件写入

import sys

sys.setrecursionlimit(50000)

    (4)解决UTF8乱码问题,终端写入

chcp 65001

    (5)将.spec文件封装为.exe文件,终端写入

pyinstaller -F XXX.spec

    (6)将生成的.exe文件与.ini配置文件一并上传至服务器,存于同一文件夹中下,设定windows定时作业

 

五. 项目总结

  第一次尝试了编写Python将数据上传至API的脚本程序,也是Python的首个实战项目,学到了很多第三方模块的运用。程序的优化仍需加强,继续学习!

 

  

  

标签:实战,log,self,sql,API,WLN,time,数据传输,config
来源: https://www.cnblogs.com/poppylibrary/p/13679366.html

专注分享技术,共同学习,共同进步。侵权联系[admin#icode9.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有