在 SQLAlchemy 中提供了将数据库中的表自动映射为ORM类的扩展(sqlalchemy.ext.automap)。
基本用法最简单用例是将一个已存在的数据库映射为新的model。
from sqlalchemy.ext.automap import automap_base from sqlalchemy.orm import Session from sqlalchemy import create_engine Base = automap_base() # 我们假设数据库中存在两张表 `user` 和 `address`。 engine = create_engine("sqlite:///mydatabase.db") # 将数据库中的表全部映射到 Base.classes 中 Base.prepare(engine, reflect=True) # 映射类的默认名为数据表的名称 # 可以通过数据表名访问 User = Base.classes.user Address = Base.classes.address session = Session(engine) # 对象关联关系 session.add(Address(email_address="foo@bar.com", user=User(name="foo"))) session.commit() 从存在的 MetaData 中生成映射 from sqlalchemy import create_engine, MetaData, Table, Column, ForeignKey from sqlalchemy.ext.automap import automap_base engine = create_engine("sqlite:///mydatabase.db") # 实例化 MetaData 对象 metadata = MetaData() # 仅映射 only 中指定的表 metadata.reflect(engine, only=['user', 'address']) # 或者映射我们创建的Table 对象 Table('user_order', metadata, Column('id', Integer, primary_key=True), Column('user_id', ForeignKey('user.id')) ) # 传入 metadata MetaData.Base = automap_base(metadata=metadata) # 调用 prepare 方法 建立映射关系 Base.prepare() # 获取映射的类 User, Address, Order = Base.classes.user, Base.classes.address, Base.classes.user_order 映射特定的类 from sqlalchemy.ext.automap import automap_base from sqlalchemy import create_engine # automap base Base = automap_base() # 声明 user 表对应的 User 类 class User(Base): __tablename__ = 'user' # 重写表中定义的列 user_name = Column('name', String) # 也可修改数据表关联关系 address_collection = relationship("address", collection_class=set) # 映射 engine = create_engine("sqlite:///mydatabase.db") Base.prepare(engine, reflect=True) # 我们可以获取到之前的 address 表映射类 Address # 但是 User 类表为了 我们上面定义 Address = Base.classes.address u1 = session.query(User).first() print (u1.address_collection) a1 = session.query(Address).first() print (a1.user)根据以上特性我们可以编写一个类来实现数据表到模型类的关联。
from config import default_config, DB from sqlalchemy import MetaData, create_engine from sqlalchemy.ext.automap import automap_base def get_engine(): """ 数据库配置 """ url = f"{DB['protocol']}://{DB['user']}:{DB['password']}@{DB['addr']}:{DB['port']}/{DB['dbname']}?charset={DB['charset']}" engine = create_engine(url, pool_size=128, echo=default_config.DEBUG, pool_recycle=3600, pool_pre_ping=True) return engine def _get_model(tablename: str): """ 数据库自动映射为模型 根据数据库中表的名称获取相应的模型 :raise sqlalchemy.exc.InvalidRequestError 数据表不存在 :return: """ try: model = getattr(AutoBase.classes, tablename) except AttributeError: # 当获取的数据表在映射中不存在,则重新映射关系 tables = automap_tables.copy() tables.append(tablename) automap_metadata.reflect(bind=get_engine(), only=tables) AutoBase.prepare() model = getattr(AutoBase.classes, tablename) mapped_tables.append(tablename) return model # autoBase 基类适用于从数据库模型中自动映射 # 需要自动映射的表 automap_tables = ['qt_system_log', 'qt_chat_log'] # 已经映射的表 mapped_tables = automap_tables automap_metadata = MetaData() # 仅自动映射 automap_tables list 中列出的数据表 automap_metadata.reflect(bind=get_engine(), only=automap_tables) AutoBase = automap_base(metadata=automap_metadata) AutoBase.prepare() AutoBase.get_model = _get_model class AutoModel(object): """ 自动映射数据表到ORM模型 :property _model_class """ # 基础表名 base_tablename = '' def __init__(self, *args, **kwargs): """ 初始化 :param args: :param kwargs: """ self._model_class = self.get_model_class() self._model = self._model_class(*args, **kwargs) def __setattr__(self, key, value): """ setter 代理到 实际的 _model 上 """ if key.startswith('_'): super().__setattr__(key, value) else: setattr(self._model, key, value) def __getattr__(self, item): """ getter 代理 :param item: :return: """ return getattr(self._model, item) def get_model(self): """ 返回实际的模型 :return: """ return self._model @classmethod def get_tablename(cls, next_shard: bool = False): """ 返回数据库表名 :arg next_shard: 是否获取下一个`分表`名称 :return: """ raise NotImplementedError @classmethod def get_model_class(cls, base: bool = False): """ 获取实际的模型类 :arg base bool 是否获取基类模型 :raise AttributeError :return: """ tablename = cls.base_tablename if base else cls.get_tablename() return AutoBase.get_model(tablename=tablename)假如存在一个系统日志表 qt_system_log 每个月自动创建一个分表如: qt_system_log_202005.
class AccessLogModel(AutoModel): base_tablename = 'qt_system_log' """ qt_system_log 按年月分表 其格式如下: qt_system_log_202005 usage: ```python # insert db = get_connection() log = AccessLogModel(user_id=1, ip=123123123, method='GET', request='/api/auth/logout', response='{"success": true, "message": "ok"}', request_at=datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), response_at=datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")) log.status = 200 db.add(log.get_model()) db.commit() ``` """ @classmethod def get_tablename(cls, next_shard: bool = False): d = datetime.datetime.now() if next_shard: d = d + datetime.timedelta(days=1) return cls.base_tablename + '_' + d.strftime("%Y%m") def __repr__(self): return f"<AccessLogModel table={self.get_tablename()}>"这样系统就可以按照当前时间操作对应的数据表了。
下一步我们来处理:自动创建分表的流程。
# 我们可以定时调用该方法,自动为系统中需要分表的模型建立分表 def shardTableCron(): engine = get_engine() # 需要分表的模型名称 models = [ AccessLogModel, ChatLog ] base_class = None for m in models: next_shard_table = m.get_tablename(next_shard=True) try: AutoBase.get_model(next_shard_table) except InvalidRequestError: try: base_class = AutoBase.get_model(m.base_tablename) except Exception: logger.error(f'table not exist: {m.base_tablename}') continue if not base_class: continue # 复制表 base_table = base_class.metadata.tables[m.base_tablename] metadata = MetaData() new_table = base_table.tometadata(metadata=metadata, name=next_shard_table) # 创建表 new_table.create(bind=engine) logger.info(f'create a table named: {next_shard_table}')