ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

【实战项目】API数据传输

2020-09-16 15:32:25  阅读:208  来源: 互联网

标签:实战 log self sql API WLN time 数据传输 config


一. 项目简述

  向xxxxAPI定期传输指定数据。

 

二. 项目设计

  1. 基础功能:

  【功能 1】 提取数据

  描述 :连接数据库,执行SQL语句,读取所需数据,并将其另存为 .xlsx 文件。

  【功能 2】 登陆API

  描述 :录入基础参数,生成签名,上传指定参数,完成接口鉴权。

  【功能 3】 获取上传路径

  描述 :解析接口鉴权的返回参数, 加入上传文件名, 获取临时上传地址。

  【功能 4】上传文件

  描述 : 向临时上传地址上传文件,并获取该文件上传后的临时下载地址。

  2. 优化功能:

  【功能 1】 设定配置文件

  描述:为了减少代码修改,将配置信息存放于 .ini 文件,需要时读取。

  【功能 2】 设定日志

  描述:为了跟踪程序运行情况,在异常出现时能及时人工介入,针对主要步骤抛出异常,存储入日志,并将日志实时发送至手机端。

 

三. 类与功能模块

  0. 第三方库与全局变量

  描述:创建日志初始信息

  全局变量:日志字典,数据的所属日期,日志计数器

  第三方库及函数解析:

  (1)datetime

    datetime.datetime.now() 获得当前时间,返回值为 datetime.datetime

    datetime.timedelta() 返回时间增量,返回值为 datetime.timedelta

  (2)time

    time.time() 当前时间的时间戳(1970纪元后经过的浮点秒数),返回值为 float

    time.localtime() 格式化时间戳为本地时间,返回值为 time.struct_time

    time.strftime()  接收 time.struct_time型值,可自定义格式化,返回值为 str

import time
import struct
import hashlib
import requests
import random
import json
import pymssql
import pandas as pd
from configobj import ConfigObj
import datetime


log = {}
date = (datetime.datetime.now() + datetime.timedelta(days = -1)).strftime("%Y-%m-%d")
counter = 0
log["DDATE"] = date
log["STIME"] = time.strftime("%Y-%m-%d %H:%M:%S",time.localtime(time.time()))

  

  1. 文件类

  描述:用于从数据库中提取数据,并生成文件

  类的实例变量:数据库域名,用户名,密码,指定数据库名,指定表名,格式化时间(用于生成文件名)

  功能模块:connect()

  第三方库及函数解析:

  (1)configobj 

    ConfigObj(filename, encoding='UTF8')  解析配置文件 .ini

  (2)pymssql

    conn = pymssql.connect(host, username, password, database) 连接数据库

    cur = conn.cursor() 创建游标

    cur.close() 关闭游标

    cur.execute(sql) 执行SQL语句

    conn.commit() 提交数据,若SQL语句为增删改时,需要提交数据使其生效

    result = cur.fetchall() 获取全部数据

  (3)pandas

    writer = pd.ExcelWriter(fname) 打开指定excel文件,若不存在则新建

    df = pd.DataFrame(result, index=None, columns=[]) 将数据整理成指定格式的数据框

    df.to_excel(writer, index=False, sheet_name='...') 将数据框写入excel文件的指定表中

    writer.save() 保存excel文件

class File(object):
def __init__(self): self.domain = '' self.username = '' self.password = '' self.database = '' self.table = '' self.now = time.strftime("%Y%m%d_%H",time.localtime(time.time())) def connect(self): # 读取配置信息 config = ConfigObj('config.ini', encoding='UTF8') self.domain = config['sql_server']['domain'] self.username = config['sql_server']['username'] self.password = config['sql_server']['password'] self.database = config['sql_server']['database'] self.table = config['sql_server']['table'] try: conn = pymssql.connect(self.domain, self.username, self.password, self.database) log["STEP"] = "6-1 成功" except pymssql.OperationalError: log["STEP"] = "6-1 数据库连接失败" # 设定日志计数器 cur1 = conn.cursor() count_sql = ''' SELECT COUNT(*) FROM {table} WHERE DDATE = '{values}' '''.format(table=self.table, values=date) cur1.execute(count_sql) global counter counter = cur1.fetchall()[0][0] + 1 log["COUNTER"] = counter cur1.close() # 填写日志 cur2 = conn.cursor() keys = ', '.join(log.keys()) values = ', '.join(['%s']*len(log)) sql = 'INSERT INTO {table}({keys}) VALUES ({values})'.format(table=self.table, keys=keys, values=values) cur2.execute(sql, tuple(log.values())) conn.commit() cur2.close() # 提取第一份数据 if log["STEP"] == "6-1 成功": cur3 = conn.cursor() sql = ''' SELECT DDATE , ORGNAME , SHPID , SHPNO , SHPNAME , DEC_ODATE , ADDR , NUM_PSN , AREA , SLAMT , GST_QTY , Uprice , AmtPA , AmtPP , SLAMT_SVS , GST_QTY_SVS , Uprice_SVS , PCT_PLU , PCT_TBC , AVG_SLAMT , AVG_GST_QTY , AVG_Uprice , AVG_AmtPA , AVG_AmtPP , WLN_DAY1 , WLN_DAY2 , WLN_DAY3 , WLN_DAY4 , WLN_DAY5 , WLN_DAY6 , WLN_DAY7 FROM OUT_Tencent_N1_SALEDAYSHP WITH(NOLOCK) WHERE DDATE = DATEADD(DAY, -1, CONVERT(VARCHAR(100), GETDATE(), 23)) ''' cur3.execute(sql) result1 = cur3.fetchall() if result1 != []: log["STEP"] = "6-2 成功" # 数据写入文件 fname = "日销售数据_" + self.now + ".xlsx" writer = pd.ExcelWriter(fname) df1 = pd.DataFrame(result1, index=None, columns=["DDATE", "ORGNAME", "SHPID", "SHPNO", "SHPNAME", "DEC_ODATE", "ADDR", "NUM_PSN", "AREA", "SLAMT", "GST_QTY", "Uprice", "AmtPA", "AmtPP", "SLAMT_SVS", "GST_QTY_SVS", "Uprice_SVS", "PCT_PLU", "PCT_TBC", "AVG_SLAMT", "AVG_GST_QTY", "AVG_Uprice", "AVG_AmtPA", "AVG_AmtPP", "WLN_DAY1", "WLN_DAY2", "WLN_DAY3", "WLN_DAY4", "WLN_DAY5", "WLN_DAY6", "WLN_DAY7"]) df1.to_excel(writer, index=False, sheet_name='门店日销售表') cur3.close() else: log["STEP"] = "6-2 第一次提取数据为空" if log["STEP"] == "6-2 成功": # 提取第二份数据 cur4 = conn.cursor() sql = ''' SELECT DDATE , ORGNAME , CLSID , CLSNO , CLSNAME , SLAMT , VALUE , UNIT , AVG_SLAMT , AVG_VALUE , WLN_WEEK1 , WLN_WEEK2 , WLN_WEEK3 , WLN_WEEK4 , WLN_WEEK5 , WLN_WEEK6 FROM OUT_Tencent_N2_SALEDAYCLS WITH(NOLOCK) WHERE DDATE = DATEADD(DAY, -1, CONVERT(VARCHAR(100), GETDATE(), 23)) ''' cur4.execute(sql) result2 = cur4.fetchall() if result1 != []: log["STEP"] = "6-3 成功" df2 = pd.DataFrame(result2, index=None, columns=["DDATE", "ORGNAME", "CLSID", "CLSNO", "CLSNAME", "SLAMT", "VALUE", "UNIT", "AVG_SLAMT", "AVG_VALUE", "WLN_WEEK1", "WLN_WEEK2", "WLN_WEEK3", "WLN_WEEK4", "WLN_WEEK5", "WLN_WEEK6"]) df2.to_excel(writer, index=False, sheet_name='分类日销售表') cur4.close() conn.close() writer.save() else: log["STEP"] = "6-3 第二次提取数据为空"

 

  2. 数据类

  描述:用于将数据文件上传至腾讯微瓴API

  类的实例变量:应用ID,应用密钥,应用票据,域名,文件名,时间戳,随机数,动态密钥,物联票据,当前时间

  功能模块:generateSignature(),importFiles(),uploadFiles()

  第三方库及函数解析:

  (1)requests

    requests.get(url, params) 请求目标网站,“查询”信息

    requests.put(url, data) 请求目标网站,“增添” / “上传”信息

  (2)json

    json.loads() 将json格式数据转化为字典

  (3)struct

    struct.pack() 按照给定格式封装字符串

  (4)hashlib

    hashlib.sha1().hexdigest() 将指定字符串进行SHA1加密,并返回十六进制摘要

class Data(object):
    def __init__(self):
        self.appid = ''
        self.appKey = ''
        self.app_ticket = ''
        self.domain = ''
        self.file_name = ''
        self.timeStamp = str(int(round(time.time() * 1000)))
        self.randomNum = ''.join(str(random.choice(range(16))) for _ in range(10))     
        self.token = ''
        self.iotim_ticket = ''
        self.now = time.strftime("%Y%m%d_%H",time.localtime(time.time())) 


    def generateSignature(self):
        signature_str = struct.pack('=12sqq',self.appKey,int(self.timeStamp),int(self.randomNum))
        signature = hashlib.sha1(signature_str).hexdigest()
        return signature


    def importFiles(self):
        with open(self.file_name, "rb") as f:
            data = f.read()
        return data


    def uploadFiles(self):
        # 读取配置信息:应用标识、应用密码、应用票据、API接口域名、文件名
        # config  = ConfigObj('E:\业务建模\API接口\config.ini', encoding='UTF8')
        config  = ConfigObj('config.ini', encoding='UTF8')
        self.appid = config['API']['appid']
        self.appKey = bytes(config['API']['appKey'], encoding="utf8")
        self.app_ticket = config['API']['app_ticket']
        self.domain = config['API']['domain']
        self.file_name = config['API']['file_name'] + self.now + ".xlsx"
        
        # 接口鉴权
        params = {'time': self.timeStamp,
                  'num': self.randomNum,
                  'sig': self.generateSignature(),
                  'appid': self.appid,
                  'app_ticket': self.app_ticket}
        if log["STEP"] == "6-3 成功":
            try:
                res = requests.get(url=self.domain + "/common/ticket/loginByApp", params=params)
                # print("接口鉴权信息:" + res.content)
                msg = json.loads(res.content)
                self.token = msg[u'data'][u"token"]
                self.iotim_ticket = msg[u"data"][u"iotim_ticket"]
                log["STEP"] = "6-4 成功" 
            except:
                log["STEP"] = "6-4 接口鉴权失败" 

        # 获取临时上传地址
        params1 = {"token": self.token,
                   "iotim_ticket": self.iotim_ticket,
                   "file_name": self.file_name}
        if log["STEP"] == "6-4 成功":
            try:      
                response1 = requests.get(url=self.domain + "/common/fileservice/uploadTemporaryFile", params=params1)
                # print("临时上传:" + response1.content.decode("utf8"))
                content = json.loads(response1.content)
                upload_url = content[u'data'][u'upload_url']
                file_id = content[u'data'][u'file_id']
                log["STEP"] = "6-5 成功" 
            except:
                log["STEP"] = "6-5 上传地址获取失败"

        # 上传文件并获取临时下载地址
        params2 = {"token": self.token,
                   "iotim_ticket": self.iotim_ticket,
                   "file_id": file_id}
        if log["STEP"] == "6-5 成功":
            try:
                upload_imagdata = requests.put(url=upload_url, data=self.importFiles())
                response2 = requests.get(url=self.domain + "/common/fileservice/downloadTemporaryFile", params=params2)
                # print("临时下载::" + response2.content.decode("utf8"))
                log["STEP"] = "6-6 成功"
            except:
                log["STEP"] = "6-6 文件上传失败"

 

  3. 日志类

  描述:用于记录程序运行日志

  类的实例变量:域名,用户名,密码,数据库名,表名

  功能模块:connect()

 

class Log(object):
    def __init__(self):
        self.domain = ''
        self.username = ''
        self.password = ''
        self.database = ''
        self.table = ''


    def connect(self):
        # 读取配置信息:数据库服务器、用户名、密码、指定数据库
        # config  = ConfigObj('E:\业务建模\API接口\config.ini', encoding='UTF8')
        config  = ConfigObj('config.ini', encoding='UTF8')
        self.domain = config['sql_server']['domain']
        self.username = config['sql_server']['username']
        self.password = config['sql_server']['password']
        self.database = config['sql_server']['database']
        self.table = config['sql_server']['table']

        conn = pymssql.connect(self.domain, self.username, self.password, self.database)
        cur = conn.cursor()
        log["ETIME"] = time.strftime("%Y-%m-%d %H:%M:%S",time.localtime(time.time()))
        keys = ', '.join(log.keys())
        values = ', '.join(['%s']*len(log))
        sql = '''
        UPDATE {table} SET
        '''.format(table=self.table)
        update = ','.join([" {key} = %s".format(key=key) for key in log])
        sql += update
        sql_2 = '''
        WHERE DDATE = '{date}' AND COUNTER = '{counter}'
        '''.format(date=date, counter=counter)
        sql += sql_2
        cur.execute(sql, tuple(log.values())*2)
        conn.commit()
        cur.close()
        conn.close()

  

  4. 执行

  描述:用于运行程序

if __name__ == '__main__':
    files = File()
    files.connect()

    if log["STEP"] == "6-3 成功":
        data = Data()
        data.uploadFiles()

    record = Log()
    record.connect()

 

四. 项目难点

  难点 1 :windows10开发环境下的.py文件部署至windows2012服务器,服务器为新,不便操作环境配置

  解决方法:将.py文件封装为.exe可执行程序,定时运行

    (1)安装 pyinstaller,终端写入

pip install pyinstaller

    (2)将.py文件封装,终端写入

pyinstaller -F XXX.py

    (3)解决封装过程中的Python递归深度异常,打开.spec文件写入

import sys

sys.setrecursionlimit(50000)

    (4)解决UTF8乱码问题,终端写入

chcp 65001

    (5)将.spec文件封装为.exe文件,终端写入

pyinstaller -F XXX.spec

    (6)将生成的.exe文件与.ini配置文件一并上传至服务器,存于同一文件夹中下,设定windows定时作业

 

五. 项目总结

  第一次尝试了编写Python将数据上传至API的脚本程序,也是Python的首个实战项目,学到了很多第三方模块的运用。程序的优化仍需加强,继续学习!

 

  

  

标签:实战,log,self,sql,API,WLN,time,数据传输,config
来源: https://www.cnblogs.com/poppylibrary/p/13679366.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有