|
|
马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。
您需要 登录 才可以下载或查看,没有账号?立即注册
x
- from sqlalchemy import exists, Column, Integer, String, ForeignKey, DateTime, Text, func
- from sqlalchemy.ext.declarative import declarative_base
- from sqlalchemy import create_engine
- from sqlalchemy.orm import sessionmaker
- from conf.parseConfig import parseConf
- # 从配置文件中获取数据库信息
- host = parseConf.get_conf('MySQLInfo', 'host')
- port = parseConf.get_conf('MySQLInfo', 'port')
- dbname = parseConf.get_conf('MySQLInfo', 'dbname')
- usernm = parseConf.get_conf('MySQLInfo', 'usernm')
- passwd = parseConf.get_conf('MySQLInfo', 'passwd')
- # 连接数据库
- engine_str = "mysql+mysqlconnector://{0}:{1}@{2}:{3}/{4}".format(usernm, passwd, host, port, dbname)
- # 创建的数据库引擎
- engine = create_engine(engine_str, encoding='utf-8')
- #创建session类型
- DBSession = sessionmaker(bind=engine)
- # 创建session对象,进行增删改查:
- session = DBSession()
- # 实例化官宣模型 - Base 就是 ORM 模型
- Base = declarative_base()
- # 创建服务单表 继承Base基类
- class ServiceOrder(Base):
- __tablename__ = 'serviceOrderTable'
- serviceOrderId = Column(String(32), primary_key=True, comment='服务单ID')
- serviceDesc = Column(String(512), comment='服务说明')
- transferTimes = Column(String(32), comment='转派次数')
- # 创建更新时间,对数据的更新进行记录
- updateTime = Column(DateTime, server_default=func.now(), onupdate=func.now())
- def init_db():
- Base.metadata.create_all(engine)
- def drop_db():
- Base.metadata.drop_all(engine)
- if __name__ == "__main__":
- # 每次执行时 会判断表的存在性 对于数据库中不存在的表进行创建 已存在的表则可以直接进行增删改查
- init_db()
- ### 首先讲一下使用sqlalchemy执行原生的sql语句###
- # 方式一:
- res = session.execute('select * from ServiceOrder') # res是获取的对象
- all_res_list = res.fetchall() # all_res_list具体的结果 是列表
- print(all_res_list ) # 结果: [('数据提取',), ('非数据提取',)]
- # 方式二:
- conn = engine.connect()
- res = conn.execute('select * from ServiceOrder')
- all_res_list = res.fetchall()
- ### 使用创建好的session对象进行增删改查 ###
- # 插入单条数据
- # 创建新service0rder对象
- new_serviceorder = ServiceOrder(serviceOrderId='001', serviceDesc='ack', transferTimes='9')
- # 添加到session
- session.add(new_serviceorder)
- # 提交即保存到数据库
- session.commit()
- # 插入多条数据
- serviceorder_list = [ServiceOrder(serviceOrderId='002', serviceDesc='好的', transferTimes='9'),ServiceOrder(serviceOrderId='003', serviceDesc='起床', transferTimes='9')]
- session.add_all(serviceorder_list)
- session.commit()
- # session.close()
- # 查询
- # 查询是否存在 结果是布尔值
- it_exists = session.query(
- exists().where(ServiceOrder.serviceOrderId == '002')
- ).scalar()
- # 创建Query查询,filter是where条件
- # 调用one() first()返回唯一行,如果调用all()则已列表的形式返回所有行:
- server_order = session.query(ServiceOrder).filter(ServiceOrder.serviceOrderId == '003').first()
- print(server_order.serviceDesc)
- serciceorders = session.query(ServiceOrder).filter(ServiceOrder.serviceDesc == '好的').all()
- # 改 更新数据
- # 数据更新,将值为Mack的serviceDesc修改为Danny
- update_obj = session.query(ServiceOrder).filter(ServiceOrder.serviceDesc == 'Mack').update({"serviceDesc": "Danny"})
- # 或则
- update_objp = session.query(ServiceOrder).filter(ServiceOrder.serviceDesc == 'Mack').first()
- update_objp.serviceDesc = 'Danny'
- session.commit()
- # 删除
- update_objk = session.query(ServiceOrder).filter(ServiceOrder.serviceDesc == 'Mack').delete()
- # 或则
- update_objkp = session.query(ServiceOrder).filter(ServiceOrder.serviceDesc == 'Mack').one()
- update_objkp.delete()
- session.commit()
- session.close()
复制代码
|
|