python-数据库封装工具
对常用的数据库的封装,数据的结果,异常采用类变量的方式。对 sqlserver, mysql,oracle三种数据库的封装。
配置
我这边是将数据库的连接信息存在了oracle数据库,其他数据源需要去数据库中去读取。
代码
# -*- coding: utf-8 -*-
# auth:lwh
import os
import sys
from re import split as str_split
from DBUtils.PooledDB import PooledDB
import cx_Oracle
import pymysql
import pymssql
os.environ['NLS_LANG'] = 'SIMPLIFIED CHINESE_CHINA.UTF8'
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.append(BASE_DIR)
from TKUtils.args import DB_INFO # DB_INFO 为配置的oracle连接信息
'''
默认连接的是配置oracle链接信息
传入表名获取配置表中配置的数据库信息
'''
class DBAccess:
'''
三种数据库的封装
查询修改是静态方法,直接 DBPoolUtil.QueryData 进行查询
'''
def __init__(self, dbInfo=DB_INFO):
self.status = False
self.result = False
self.title = False
self.Errinfo = False
config = {}
if isinstance(dbInfo, str) and dbInfo.find("/") != -1:
TNSlist = str_split(r"[/|@]", dbInfo)
self.UserName, self.PW, self.IP, self.ServicName = TNSlist[0], TNSlist[1], TNSlist[2], TNSlist[3]
config = {
'creator': cx_Oracle,
'user': self.UserName,
'password': self.PW,
'dsn': "/".join([":".join([self.IP, '1521']), self.ServicName]),
'nencoding': 'utf8',
'maxconnections': 1, # 连接池最大连接数量
}
elif isinstance(dbInfo, dict):
pass
else:
self.db_source = dbInfo
self._kwargs = self._query_db_info()
self._db_type = self._kwargs["DB_TYPE"]
if self._db_type == "MYSQL":
config = self._mysql_conf()
elif self._db_type == 'SQL_SERVER' or self._db_type == "SQL SERVER":
config = self._sqlserver_conf()
elif self._db_type == "ORACLE" or self._db_type == "oracle":
config = self._oracle_conf()
else:
raise Exception("unsupported database type " + self._db_type)
self._config = config
# print(config)
self.pool = PooledDB(**self._config)
self._login()
def _mysql_conf(self):
config = {
'creator': pymysql,
'host': self._kwargs.get("IP_ADDRESS"),
'port': int(self._kwargs.get("PORT")),
'user': self._kwargs.get("USER_NAME"),
'password': self._kwargs.get("PW") or self._kwargs.get("PASSWORD"),
'database': self._kwargs.get("SERVICE_NAME") or self._kwargs.get("SID"),
'charset': 'utf8',
'maxconnections': 1, # 连接池最大连接数量
# 'cursorclass': pymysql.cursors.DictCursor # 返回Dict结果
}
return config
def _oracle_conf(self):
config = {
'creator': cx_Oracle,
'user': self._kwargs["USER_NAME"],
'password': self._kwargs["PW"] or self._kwargs["password"],
'dsn': "/".join(
[":".join([self._kwargs.get("IP_ADDRESS"), self._kwargs.get("PORT")]), self._kwargs.get("SID")]),
'nencoding': 'utf8',
'maxconnections': 1, # 连接池最大连接数量
}
return config
def _sqlserver_conf(self):
config = {
'creator': pymssql,
'host': self._kwargs.get("IP_ADDRESS"),
'port': self._kwargs.get("PORT") or 1433,
'user': self._kwargs.get("USER_NAME"),
'password': self._kwargs.get("PW"),
'database': self._kwargs.get("SID"),
'charset': 'utf8',
'maxconnections': 1, # 连接池最大连接数量
}
return config
def _query_db_info(self):
if self.db_source:
from TKUtils.confInfo import get_db_info
db_info = get_db_info(self.tableName)
return db_info
else:
assert Exception("数据源为空,不可查询")
def _login(self):
self.db = self.pool.connection()
self.cursor = self.db.cursor()
def logout(self):
self.cursor.close()
self.db.close()
def __del__(self):
self.logout()
def get_sys_date(self):
# 获取连接数据库时间
pass
def get_fields(self):
# 取得指定表的字段名和类型列表
pass
def queryData(self, sql):
'''
此函数只返回了查询结果
:param sql: 查询sql
:return: 查询结果
'''
try:
self.cursor.execute(sql)
result = self.cursor.fetchall()
col_names = [i[0] for i in self.cursor.description]
self.status = True
self.result = result
self.title = col_names
return result, col_names
except cx_Oracle.Error as data_error:
self.db.rollback()
self.status = False
self.result = "执行失败 错误信息:{0}".format(data_error)
return data_error
except pymysql.Error as pymysqlError:
self.db.rollback()
self.status = False
self.result = "执行失败 错误信息:{0}".format(pymysqlError)
return pymysqlError
except pymssql.Error as sqlserver_Error:
self.db.rollback()
self.status = False
self.result = "执行失败 错误信息:{0}".format(sqlserver_Error)
return sqlserver_Error
def execSQL(self, sql, content=None):
'''
:param sql: 执行sql、sql占位符
:param content:
:return:
'''
try:
if content is None:
self.cursor.execute(sql)
elif type(content) is tuple:
self.cursor.execute(sql, content)
elif type(content) is list:
self.cursor.executemany(sql, content)
else:
self.cursor.execute(sql, content)
self.db.commit()
self.status = True
self.result = "执行成功"
return 1
except cx_Oracle.Error as data_error:
print("Error", data_error)
self.db.rollback()
self.status = False
self.result = "执行失败 错误信息:{0}".format(data_error)
return 0, data_error
except cx_Oracle.DatabaseError as dDatabaseerror:
print("dDatabaseerror", dDatabaseerror)
self.db.rollback()
self.status = False
self.result = "执行失败 错误信息:{0}".format(dDatabaseerror)
return 0, dDatabaseerror
except pymysql.Error as Error:
self.db.rollback()
self.status = False
self.result = "执行失败 错误信息:{0}".format(Error)
return Error
except pymssql.Error as sqlserver_Error:
self.db.rollback()
self.status = False
self.result = "执行失败 错误信息:{0}".format(sqlserver_Error)
return sqlserver_Error
def rows_as_dicts(self, content: list, col_names: list):
'''
将数据库查询结果转为Dict
:param content: 数据库查询结果 list
:param col_names:
:return: type : Dict
'''
return list(dict(zip(col_names, row)) for row in content)
if __name__ == '__main__':
sql = """select sysdate from dual"""
sql2 = "select SYSDATE() from dual"
sql3 = "select GETDATE()"
tableName = "CBONDPUT"
target_connect = DBAccess(dbInfo='mdp_etl/mdp_20200421@127.0.0.1/mdp')
res = target_connect.queryData(sql)
print(target_connect.IP)
# a = 'mdp_etl/mdp_20200421@127.0.0.1/mdp'
# a.split('@')
# print(target_connect.status)
# print(target_connect.result)
# del target_connect
db