当前位置 博文首页 > Python实现一个自助取数查询工具

    Python实现一个自助取数查询工具

    作者:Brook 时间:2021-08-10 18:38

    目录
    • 具体思路:
    • 一、数据库连接类
    • 二、数据提取主函数模块

    基于底层数据来开发不难,无非是将用户输入变量作为筛选条件,将参数映射到 sql 语句,并生成一个 sql 语句然后再去数据库执行

    最后再利用 QT 开发一个 GUI 界面,用户界面的点击和筛选条件,信号触发对应按钮与绑定的传参槽函数执行

    具体思路:

    一、数据库连接类

    此处利用 pandas 读写操作 oracle 数据库

    二、主函数模块

    1)输入参数模块,外部输入条件参数,建立数据库关键字段映射

    --注:读取外部 txt 文件,将筛选字段可能需要进行键值对转换

    2)sql 语句集合模块,将待执行的业务 sql 语句统一存放到这里

    3)数据处理函数工厂

    4)使用多线程提取数据

    一、数据库连接类

    cx_Oracle 是一个 Python 扩展模块,相当于 python 的 Oracle 数据库的驱动,通过使用所有数据库访问模块通用的数据库 API 来实现 Oracle 数据库的查询和更新

    Pandas 是基于 NumPy 开发,为了解决数据分析任务的模块,Pandas 引入了大量库和一些标准的数据模型,提供了高效地操作大型数据集所需的方法类和函数

    pandas 调用数据库主要有 read_sql_table,read_sql_query,read_sql 三种方式

    本文主要介绍一下 Pandas 中 read_sql_query 方法的使用

    1:pd.read_sql_query()
    读取自定义数据,返还DataFrame格式,通过SQL查询脚本包括增删改查。
    pd.read_sql_query(sql, con, index_col=None,coerce_float=True, params=None, parse_dates=None,chunksize=None)
    sql:要执行的sql脚本,文本类型
    con:数据库连接
    index_col:选择返回结果集索引的列,文本/文本列表
    coerce_float:非常有用,将数字形式的字符串直接以float型读入
    parse_dates:将某一列日期型字符串转换为datetime型数据,与pd.to_datetime函数功能类似。
    params:向sql脚本中传入的参数,官方类型有列表,元组和字典。用于传递参数的语法是数据库驱动程序相关的。
    chunksize:如果提供了一个整数值,那么就会返回一个generator,每次输出的行数就是提供的值的大小
    
    read_sql_query()中可以接受SQL语句,DELETE,INSERT INTO、UPDATE操作没有返回值(但是会在数据库中执行),程序会抛出SourceCodeCloseError,并终止程序。SELECT会返回结果。如果想继续运行,可以try捕捉此异常。
     
    2:pd.read_sql_table()
    读取数据库中的表,返还DataFrame格式(通过表名)
    import pandas as pd
    pd.read_sql_table(table_name, con, schema=None,index_col=None, coerce_float=True, parse_dates=None, columns=None,chunksize=None)
     
    3:pd.read_sql()
    读数据库通过SQL脚本或者表名
    import pandas as pd
    pd.read_sql(sql, con, index_col=None,coerce_float=True, params=None, parse_dates=None, columns=None, chunksize=None)

    以下创建连接 oracel 数据库的连接类 Oracle_DB

    主要提供 2 种操作数据的函数方法。

    import cx_Oracle
    # Pandas读写操作Oracle数据库
    import pandas as pd
    
    # 避免编码问题带来的乱码
    import os
    os.environ['NLS_LANG'] = 'SIMPLIFIED CHINESE_CHINA.UTF8'
    
    
    class Oracle_DB(object):
        def __init__(self):
            try:
                # 连接oracle
                # 方法1:sqlalchemy 提供的create_engine()
                # from sqlalchemy import create_engine
                # engine = create_engine('oracle+cx_oracle://username:password@ip:1521/ORCL')
                # #方法2:cx_Oracle.connect()
                self.engine = cx_Oracle.connect('username', 'password', 'ip:1521/database')
    
            except cx_Oracle.Error as e:
                print("Error %d:%s" % (e.args[0], e.args[1]))
                exit()
                
        # 查询部分信息
        def search_one(self, sql,sparm):
            try:
                # #查询获取数据用sql语句
                # 代传参数:sparm--查询指定字段参数
                df = pd.read_sql_query(sql, self.engine,params=sparm)
    
                self.engine.close()
    
            except Exception as e:
                return "Error " + e.args[0]
    
            return df
    
        # 查询全部信息
        def search_all(self, sql):
            try:
    
                # #查询获取数据用sql语句
    
                df = pd.read_sql_query(sql, self.engine)
    
                self.engine.close()
    
            except Exception as e:
                return "Error " + e.args[0]
    
            return df

    二、数据提取主函数模块

    cx_Oracle 是一个 Python 扩展模块,相当于 python 的 Oracle 数据库的驱动,通过使用所有数据库访问模块通用的数据库 API 来实现 Oracle 数据库的查询和更新。

    1)外部输入参数模块

    txt 文本中,就包含一列数据,第一行列名,读取的时候忽略第一行

    #建立ID——编号字典
    def buildid():
        sqlid = """select * from b_build_info"""
        db = Oracle_DB()  # 实例化一个对象
        b_build_info = db.search_all(sqlid)
        ID_bUILDCODE = b_build_info.set_index("BUILDCODE")["ID"].to_dict()
        return ID_bUILDCODE
        
    #通过文本传入待导出数据清单
    def read_task_list():
        build_code=buildid()
        tasklist=[]
        is_first_line=True
        with open("./b_lst.txt") as lst:
            for line in lst:
                if is_first_line:
                    is_first_line=False
                    continue
                tasklist.append(build_code.get(line.strip('\n')))  #键值对转换
        return tasklist

    2)业务 sql 语句集合

    注意in后面{0}不要加引号,这里传入为元组,params 参数传入sparm

    = {'Start_time':'2021-04-01','End_time':'2021-05-01'},此处参数可根据需要改变

    def sql_d(lst):
        # 逐月数据
        sql_d_energy_item_month = """select * from d_energy_item_month
               where recorddate >= to_date(:Start_time, 'yyyy-MM-dd')
               and recorddate < to_date(:End_time, 'yyyy-MM-dd')
               and  buildid  in {0}
               order by recorddate asc""".format(lst)
    
        # 逐月数据
        sql_d_energy_month = """select d.*,t.name from d_energy_month d join t_device_info t on d.branchid = t.id
               where d.recorddate >= to_date(:Start_time, 'yyyy-MM-dd')
               and d.recorddate < to_date(:End_time, 'yyyy-MM-dd')
               and d.buildid = '{0}'
               order by d.recorddate asc""".format(lst)
    
        # 查询当日数据
        sql_energy_item_hour_cheak = """select * from d_energy_item_hour
                where trunc(sysdate)=trunc(recorddate)
                order by recorddate asc""".format(lst)
    
        sql_collection = [sql_d_energy_item_month, sql_d_energy_item_day, sql_d_energy_item_hour, sql_d_energy_month,
                          sql_d_energy_day, sql_d_energy_hour, sql_energy_hour_cheak]
                          #此处省略部分sql语句
        return sql_collection

    3)业务数据处理

    业务数据处理流程,原始数据后处理,这里不作介绍:

    def db_extranction(lst,sparm,sql_type):   
        """sql_type--输入需要操作的sql业务序号"""
        sql_=sql_d(lst)[sql_type]  #输出sql语句
        db = Oracle_DB()  # 实例化一个对象
        res=db.search_one(sql_,sparm)
        # 数据处理加工
        RES=Data_item_factory(res)  #此处省略
        # res = db.search_all(sql_d_energy_item_month)
        print(RES)
        return RES

    多线程提取数据部分,这里 tasklist 列表多线程提取数据

    import threading
    # Pandas读写操作Oracle数据库
    from tools.Data_Update_oracle import Oracle_DB
    import pandas as pd
    from concurrent import futures  
    
    if __name__ == '__main__':
        #外部传入
        tasklist= read_task_list()
        print(tasklist)
        # 输入时间查找范围参数,可手动修改
        sparm = {'Start_time':'2021-04-01','End_time':'2021-05-01'}
        lst = tuple(list(tasklist))
        
        #业务类型序号,可手动修改
        sql_type=0
        
        #全部提取
        db_extranction(lst,sparm,sql_type)  
    
        #多线程按字段分批提取
        方法一:使用threading模块的Thread类的构造器创建线程
        #threads=[threading.Thread(target=db_extranction,args=(lst,sparm,sql_type)) for lst in tasklist]
        # [threads[i].start() for i in range(len(threads))]
        
        方法二:使用python的concurrent库,这是官方基于 threading 封装,先安装该库
        # with futures.ThreadPoolExecutor(len(tasklist)) as executor:
        #     executor.map([db_extranction(lst,sparm,sql_type) for lst in tasklist],tasklist)  

    到此整个数据库取数工具开发流程介绍完毕,就差最后一步分享给小伙伴使用了,做成 GUI 应用此处不做详细介绍,构建独立的 python 环境,快速发布你的应用

    jsjbwy