python-三种数据库封装的工具

tech2024-12-25  12

python-数据库封装工具

对常用的数据库的封装,数据的结果,异常采用类变量的方式。对 sqlserver, mysql,oracle三种数据库的封装。

配置

我这边是将数据库的连接信息存在了oracle数据库,其他数据源需要去数据库中去读取。

代码
# -*- coding: utf-8 -*- # auth:lwh import os import sys from re import split as str_split from DBUtils.PooledDB import PooledDB import cx_Oracle import pymysql import pymssql os.environ['NLS_LANG'] = 'SIMPLIFIED CHINESE_CHINA.UTF8' BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) sys.path.append(BASE_DIR) from TKUtils.args import DB_INFO # DB_INFO 为配置的oracle连接信息 ''' 默认连接的是配置oracle链接信息 传入表名获取配置表中配置的数据库信息 ''' class DBAccess: ''' 三种数据库的封装 查询修改是静态方法,直接 DBPoolUtil.QueryData 进行查询 ''' def __init__(self, dbInfo=DB_INFO): self.status = False self.result = False self.title = False self.Errinfo = False config = {} if isinstance(dbInfo, str) and dbInfo.find("/") != -1: TNSlist = str_split(r"[/|@]", dbInfo) self.UserName, self.PW, self.IP, self.ServicName = TNSlist[0], TNSlist[1], TNSlist[2], TNSlist[3] config = { 'creator': cx_Oracle, 'user': self.UserName, 'password': self.PW, 'dsn': "/".join([":".join([self.IP, '1521']), self.ServicName]), 'nencoding': 'utf8', 'maxconnections': 1, # 连接池最大连接数量 } elif isinstance(dbInfo, dict): pass else: self.db_source = dbInfo self._kwargs = self._query_db_info() self._db_type = self._kwargs["DB_TYPE"] if self._db_type == "MYSQL": config = self._mysql_conf() elif self._db_type == 'SQL_SERVER' or self._db_type == "SQL SERVER": config = self._sqlserver_conf() elif self._db_type == "ORACLE" or self._db_type == "oracle": config = self._oracle_conf() else: raise Exception("unsupported database type " + self._db_type) self._config = config # print(config) self.pool = PooledDB(**self._config) self._login() def _mysql_conf(self): config = { 'creator': pymysql, 'host': self._kwargs.get("IP_ADDRESS"), 'port': int(self._kwargs.get("PORT")), 'user': self._kwargs.get("USER_NAME"), 'password': self._kwargs.get("PW") or self._kwargs.get("PASSWORD"), 'database': self._kwargs.get("SERVICE_NAME") or self._kwargs.get("SID"), 'charset': 'utf8', 'maxconnections': 1, # 连接池最大连接数量 # 'cursorclass': pymysql.cursors.DictCursor # 返回Dict结果 } return config def _oracle_conf(self): config = { 'creator': cx_Oracle, 'user': self._kwargs["USER_NAME"], 'password': self._kwargs["PW"] or self._kwargs["password"], 'dsn': "/".join( [":".join([self._kwargs.get("IP_ADDRESS"), self._kwargs.get("PORT")]), self._kwargs.get("SID")]), 'nencoding': 'utf8', 'maxconnections': 1, # 连接池最大连接数量 } return config def _sqlserver_conf(self): config = { 'creator': pymssql, 'host': self._kwargs.get("IP_ADDRESS"), 'port': self._kwargs.get("PORT") or 1433, 'user': self._kwargs.get("USER_NAME"), 'password': self._kwargs.get("PW"), 'database': self._kwargs.get("SID"), 'charset': 'utf8', 'maxconnections': 1, # 连接池最大连接数量 } return config def _query_db_info(self): if self.db_source: from TKUtils.confInfo import get_db_info db_info = get_db_info(self.tableName) return db_info else: assert Exception("数据源为空,不可查询") def _login(self): self.db = self.pool.connection() self.cursor = self.db.cursor() def logout(self): self.cursor.close() self.db.close() def __del__(self): self.logout() def get_sys_date(self): # 获取连接数据库时间 pass def get_fields(self): # 取得指定表的字段名和类型列表 pass def queryData(self, sql): ''' 此函数只返回了查询结果 :param sql: 查询sql :return: 查询结果 ''' try: self.cursor.execute(sql) result = self.cursor.fetchall() col_names = [i[0] for i in self.cursor.description] self.status = True self.result = result self.title = col_names return result, col_names except cx_Oracle.Error as data_error: self.db.rollback() self.status = False self.result = "执行失败 错误信息:{0}".format(data_error) return data_error except pymysql.Error as pymysqlError: self.db.rollback() self.status = False self.result = "执行失败 错误信息:{0}".format(pymysqlError) return pymysqlError except pymssql.Error as sqlserver_Error: self.db.rollback() self.status = False self.result = "执行失败 错误信息:{0}".format(sqlserver_Error) return sqlserver_Error def execSQL(self, sql, content=None): ''' :param sql: 执行sql、sql占位符 :param content: :return: ''' try: if content is None: self.cursor.execute(sql) elif type(content) is tuple: self.cursor.execute(sql, content) elif type(content) is list: self.cursor.executemany(sql, content) else: self.cursor.execute(sql, content) self.db.commit() self.status = True self.result = "执行成功" return 1 except cx_Oracle.Error as data_error: print("Error", data_error) self.db.rollback() self.status = False self.result = "执行失败 错误信息:{0}".format(data_error) return 0, data_error except cx_Oracle.DatabaseError as dDatabaseerror: print("dDatabaseerror", dDatabaseerror) self.db.rollback() self.status = False self.result = "执行失败 错误信息:{0}".format(dDatabaseerror) return 0, dDatabaseerror except pymysql.Error as Error: self.db.rollback() self.status = False self.result = "执行失败 错误信息:{0}".format(Error) return Error except pymssql.Error as sqlserver_Error: self.db.rollback() self.status = False self.result = "执行失败 错误信息:{0}".format(sqlserver_Error) return sqlserver_Error def rows_as_dicts(self, content: list, col_names: list): ''' 将数据库查询结果转为Dict :param content: 数据库查询结果 list :param col_names: :return: type : Dict ''' return list(dict(zip(col_names, row)) for row in content) if __name__ == '__main__': sql = """select sysdate from dual""" sql2 = "select SYSDATE() from dual" sql3 = "select GETDATE()" tableName = "CBONDPUT" target_connect = DBAccess(dbInfo='mdp_etl/mdp_20200421@127.0.0.1/mdp') res = target_connect.queryData(sql) print(target_connect.IP) # a = 'mdp_etl/mdp_20200421@127.0.0.1/mdp' # a.split('@') # print(target_connect.status) # print(target_connect.result) # del target_connect db
最新回复(0)