SQLAlchemy中常用的查询方法[示例学习]

打印 上一主题 下一主题

主题 778|帖子 778|积分 2334

SQLAlchemy 是一个强大的 Python ORM(对象关系映射)工具,它提供了多种方法来执行数据库查询操作。以下是 SQLAlchemy 中常用的查询方法的总结:

  • session.query():使用 session.query(Model) 来创建一个查询对象,此中 Model 是你要查询的数据库模型类。
  • filter():在查询对象上使用 filter() 方法可以添加过滤条件,比方 filter(Model.column == value)。
  • all():使用 all() 方法可以获取查询的所有效果,并以列表的形式返回。
  • first():使用 first() 方法可以获取查询效果的第一条记载。
  • get():使用 get(primary_key_value) 方法可以根据主键值直接获取相应的记载。
  • filter_by():使用 filter_by(column=value) 方法可以根据指定列的数值进行过滤。
  • join():使用 join() 方法可以进行表的内毗连查询。
  • outerjoin():使用 outerjoin() 方法可以进行表的外毗连查询。
  • group_by():使用 group_by() 方法可以按指定列分组查询。
  • order_by():使用 order_by() 方法可以对查询效果进行排序。
  • count():使用 count() 方法可以统计查询效果的数量。
  • delete():使用 delete() 方法可以删除满足条件的记载。
  • update():使用 update() 方法可以更新满足条件的记载。
  1. from sqlalchemy.orm import declarative_base,sessionmaker
  2. from sqlalchemy import create_engine, ForeignKey
  3. from sqlalchemy import Column, Integer, String
  4. Base = declarative_base()
  5. class User(Base):
  6.     __tablename__ = 'users'
  7.    
  8.     id = Column(Integer, primary_key=True)
  9.     name = Column(String)
  10.     age = Column(Integer)
  11. class Address(Base):
  12.     __tablename__ = 'addresses'
  13.    
  14.     id = Column(Integer, primary_key=True)
  15.     user_id = Column(Integer, ForeignKey('users.id'))
  16.     street = Column(String)
  17.     city = Column(String)
  18. # 创建数据库连接
  19. engine = create_engine('sqlite:///users_addresses_example.db')
  20. Base.metadata.create_all(engine)
  21. # 创建会话
  22. Session = sessionmaker(bind=engine)
  23. session = Session()
  24. # 清空 User 表数据
  25. session.query(User).delete()
  26. session.commit()
  27. # 清空 Address 表数据
  28. session.query(Address).delete()
  29. session.commit()
  30. # 插入数据到 User 表
  31. user1 = User(name='Alice', age=25)
  32. user2 = User(name='Bob', age=30)
  33. user3 = User(name='Charlie', age=22)
  34. user4 = User(name='pemp', age=34)
  35. user5 = User(name='pita', age=39)
  36. session.add_all([user1, user2, user3, user4, user5])
  37. session.commit()
  38. # 插入数据到 Address 表
  39. address1 = Address(user_id=user1.id, street='123 Main St', city='New York')
  40. address2 = Address(user_id=user2.id, street='456 Park Ave', city='Los Angeles')
  41. address3 = Address(user_id=user3.id, street='789 Elm St', city='Chicago')
  42. session.add_all([address1, address2, address3])
  43. session.commit()
  44. # 进行表的联合查询
  45. result = session.query(User, Address).join(Address, User.id == Address.user_id).all()
  46. for user, address in result:
  47.     print(f"User: {user.name}, Age: {user.age}, Address: {address.street}, {address.city}")
  48. print('*'*40)
  49. from sqlalchemy import and_
  50. # 进行表的外连接查询
  51. # outerjoin外连接查询将返回左表User的所有记录,以及右表Address 表中与左表记录关联的数据,如果没有匹配的记录,则右表数据部分为 NULL。
  52. # result = session.query(User.id, Address.id).outerjoin(Address, User.id == Address.user_id).all()
  53. # # 找出没有关联的记录
  54. # unassociated_records = [(user_id, address_id) for user_id, address_id in result if address_id is None]
  55. # print("User 和 Address 表中没有关联的 ID:")
  56. # for user_id, _ in unassociated_records:
  57. #     print(f"独立User ID: {user_id}")
  58. # 进行表的外连接查询
  59. result = session.query(User, Address).outerjoin(Address, User.id == Address.user_id).all()
  60. for user, address in result:
  61.     if address is None:
  62.         print(f"独立User: {user.name}, Age: {user.age}, Address: None")
  63.     else:
  64.         print(f"User(关联ID): {user.name}, Age: {user.age}, Address: {address.street}, {address.city}")
  65. # 关闭会话
  66. session.close()
复制代码
  1. from sqlalchemy import create_engine, Column, Integer, String,select
  2. from sqlalchemy.orm import sessionmaker, declarative_base
  3. from sqlalchemy import func
  4. # 创建数据库连接
  5. engine = create_engine('sqlite:///users_example.db')
  6. Base = declarative_base()
  7. # 定义模型类
  8. class User(Base):
  9.     __tablename__ = 'users'
  10.     id = Column(Integer, primary_key=True)
  11.     name = Column(String)
  12.     age = Column(Integer)
  13. # 创建数据表
  14. Base.metadata.create_all(engine)
  15. # 创建 Session 类
  16. Session = sessionmaker(bind=engine)
  17. session = Session()
  18. # 事先清空数据
  19. session.query(User).delete()
  20. session.commit()
  21. # 插入更多数据
  22. users_data = [
  23.     {'name': 'Charlie', 'age': 28},
  24.     {'name': 'David', 'age': 32},
  25.     {'name': 'Eve', 'age': 27},
  26.     {'name': 'gemm', 'age': 43},
  27.     {'name': 'riyu', 'age': 43}
  28. ]
  29. for user_data in users_data:
  30.     user = User(**user_data)
  31.     session.add(user)
  32. session.commit()
  33. # 查询数据
  34. query = session.query(User)
  35. # 使用 filter() 方法添加过滤条件
  36. result = query.filter(User.age > 25).all()
  37. print("年龄大于25的用户:", [(user.name, user.age) for user in result])
  38. # 使用 func.group_concat() 函数将同一年龄下的用户名合并成一个字符串
  39. result = session.query(User.age, func.group_concat(User.name)).group_by(User.age).all()
  40. print("按年龄分组查询:")
  41. for age, names in result:
  42.     name_list = names.split(',')
  43.     print(f"年龄 {age} 的用户有:{', '.join(name_list)}")
  44. # # 使用 group_by() 方法按年龄分组查询
  45. # result = query.with_entities(User.age, func.group_concat(User.name)).group_by(User.age).all()
  46. # print("按年龄分组查询:")
  47. # for age, names in result:
  48. #     name_list = names.split(',')
  49. #     print(f"年龄 {age} 的用户有:{', '.join(name_list)}")
  50. # 使用 order_by() 方法对结果进行排序
  51. result = query.order_by(User.age).all()
  52. print("按年龄排序:", [(user.name, user.age) for user in result])
  53. # 统计查询结果的数量
  54. count = query.count()
  55. print("查询结果的数量:", count)
  56. # 删除满足条件的记录
  57. session.query(User).filter(User.name == 'Charlie').delete()
  58. session.commit()
  59. # 更新满足条件的记录
  60. session.query(User).filter(User.name == 'David').update({User.age: 35})
  61. session.commit()
  62. # 执行 SELECT 查询
  63. # 使用 select(User) 会创建一个 SELECT 查询,查询的对象是 User 这个表格(模型类)。这意味着你将检索 User 表中的所有列
  64. with engine.connect() as connection:
  65.     stmt = select(User).where(User.age > 25)
  66.     result = connection.execute(stmt)
  67.     for row in result:
  68.         print(row)
复制代码
---------------
sqlalchemy 中 select 函数的参数可以是一个或多个表达式,用于指定要查询的列。
它担当一个可迭代对象作为参数,该可迭代对象包含要选择的列或其他表达式。
以下是 select 函数的基本语法:
select(columns, whereclause=None, from_obj=[], **kwargs)


  • columns:要选择的列或其他表达式,可以是一个或多个。select(User模型类)
  • whereclause:可选参数,用于指定 WHERE 子句中的条件表达式。
  • from_obj:可选参数,用于指定查询的泉源表(FROM 子句)。
  • **kwargs:其他可选参数,比方 group_by、having、order_by 等。
在使用 select 函数时,你至少必要提供一个 columns 参数来指定要选择的列。其他参数都是可选的,根据实际必要来决定是否使用。
------------------------
session.scalars() 方法是 SQLAlchemy 中用于执行查询并返回标量值(Scalar)的方法。在 ORM 查询中,当你只必要获取一列数据的值而不是整个对象时,可以使用 session.scalars() 方法。 
  1. from sqlalchemy import select
  2. # 创建一个 SELECT 查询,选择名字为 "spongebob" 或 "sandy" 的用户的 id 列
  3. stmt = select(User.id).where(User.name.in_(["spongebob", "sandy"]))
  4. # 使用会话执行查询
  5. with Session(engine) as session:
  6.     # 执行查询并返回标量值
  7.     for user_id in session.scalars(stmt):
  8.         print(user_id)
复制代码
 

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

灌篮少年

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表