user_modules.py
from datetime import datetime
from sqlalchemy import Column,Integer,String,Boolean,DateTime,ForeignKey
from sqlalchemy.orm import relationship
from .connect import Base,session
class User(Base):
__tablename__='user'
id=Column(Integer,primary_key=True,autoincrement=True)
username=Column(String(20),nullable=False)
passwd=Column(String(50),nullable=False)
createtime=Column(DateTime,default=datetime.now)
_locked=Column(Boolean,default=False,nullable=False)
#在modules中写好查询条件,使用时直接调用
@classmethod
def all(cls):
return session.query(cls).all()
@classmethod
def by_name(cls,username):
return session.query(cls).filter_by(username=username).all()
@property
def locked(self):
return self._locked
def __repr__(self):
return '<User(id=%s,username=%s,passwd=%s,createtime=%s,_locked=%s)>'%(
self.id,
self.username,
self.passwd,
self.createtime,
self._locked
)
class UserDetails(Base):
__tablename__='user_details'
id=Column(Integer,primary_key=True,autoincrement=True)
id_card=Column(Integer,nullable=True,unique=True)
last_login=Column(DateTime)
login_num=Column(Integer,default=0)
user_id=Column(Integer,ForeignKey('user.id'))
#bakcref建立反向索引,
userdetails_for_foreignkey=relationship('User',backref='details',uselist=False,cascade='all')
def __self__(self):
return '<UserDetails(id=%s,id_card=%s,last_login=%s,login_num=%s,user_id=%s)>'%(
self.id,
self.id_card,
self.last_login,
self.login_num,
self.user_id
)
if __name__=='__main__':
Base.metadata.create_all()
query.py
from data.user_modules import User,session,UserDetails
#带条件查询
raw=session.query(User).filter_by(username='nanian').all()
raw=session.query(User).filter_by(username='nanian') #去掉.all()原生sql
raw=session.query(User).filter(User.username =='nanian').all()
raw=session.query(User.username).filter(User.username !='nanian').all()
raw=session.query(User.username).filter(User.username !='nanian').first()
raw=session.query(User.username).filter(User.username !='nanian').one() #如果前面查出的是多条数据则报错
print(session.query(User).get(2)) #根据主键查,会自己找主键
print(raw)
#限制查询结果数
print(session.query(User).filter(User.username!='nanian').limit(3).all())#前三行
print(session.query(User).filter(User.username!='nanian').offset(3).all())#第三行以后
print(session.query(User).filter(User.username!='nanian').slice(1,3).all())#2,3行
#排序
from sqlalchemy import desc
raw=session.query(User).filter(User.username !='nanian').order_by(User.username).all()
raw=session.query(User).filter(User.username !='nanian').order_by(desc(User.username).all()#逆序
#模糊查询 尽量少用模糊查询,效率低
from sqlalchemy import or_
raw=session.query(User).filter(User.username!='nanian').all()
raw=session.query(User).filter(User.username.like('n%').all()
raw=session.query(User).filter(User.username.notlike('n%').all()
raw=session.query(User).filter(User.username.in_(['nanian','a']).all()) #加下划线表示和python关键字作区分
raw=session.query(User).filter(User.username.isnot(None),User.passwd=='123').all()) #多条件
raw=session.query(User).filter(or_(User.username.isnot(None),User.passwd=='123')).all()) #或
raw=session.query(User).filter(User.username==None).all())
#聚合函数
from sqlalchemy import func,extract
print(session.query(User.passwd,func.count(User.id)).group_by(User.passwd).all())
print(session.query(User.passwd,func.count(User.id)).group_by(User.passwd).\
having(func.count(User.id)>1) all())
print( session.query(extract('minute',User.createtime).label('minute'),\
func.count(User.id)).group_by('minute').all() ) #提取分钟,按分钟分组
#多表查询
raw=session.query(User,UserDetails).all()
raw=session.query(User,UserDetails).filter(UserDetails.id==User.id) all()# cross join
raw=session.query(User.username,UserDetails.last_login).\
join(UserDetails, UserDetails.id==User.id) all()# inner join
raw=session.query(User.username,UserDetails.last_login).\
outerjoin(UserDetails, UserDetails.id==User.id) all()
# outer join代表left join 左连接,右连接将表反过来(sqlalchemy没有rightjoin),小表左连接右表效率高
q1=session.query(User.id)
q2=session.query(UserDetails.id)
raw=q1.union(q2).all()
from sqlalchemy import all_,any_
sql_0=session.query(UserDetails.last_login).subquery() #声明子表
raw=session.query(User).filter(User.createtime >all_(sql_0)).all()
raw=session.query(User).filter(User.createtime >any_(sql_0)).all()
#原生sql
sql_1='''
select * from `user`
'''
raw=session.execute(sql_1)
#print(raw,dir(raw))
#print(raw.fetchone())
#print(raw.fetchmany())
#print(raw.fetchall())
for i in raw:
print(i)