读取数据范围 - LIMIT 和 OFFSET¶
现在你知道如何使用 .one()、.first() 和 session.get() 获取单行数据了。
你也知道如何使用 .where() 过滤并获取多行数据。
现在我们来看看如何只获取**结果范围**。
创建数据¶
我们将继续使用之前的代码,但会稍微修改 select_heroes() 函数,以简化示例并专注于我们在这里想要实现的目标。
我们将再次创建几个英雄,以便有一些数据可供选择
# Code above omitted 👆
def create_heroes():
hero_1 = Hero(name="Deadpond", secret_name="Dive Wilson")
hero_2 = Hero(name="Spider-Boy", secret_name="Pedro Parqueador")
hero_3 = Hero(name="Rusty-Man", secret_name="Tommy Sharp", age=48)
hero_4 = Hero(name="Tarantula", secret_name="Natalia Roman-on", age=32)
hero_5 = Hero(name="Black Lion", secret_name="Trevor Challa", age=35)
hero_6 = Hero(name="Dr. Weird", secret_name="Steve Weird", age=36)
hero_7 = Hero(name="Captain North America", secret_name="Esteban Rogelios", age=93)
with Session(engine) as session:
session.add(hero_1)
session.add(hero_2)
session.add(hero_3)
session.add(hero_4)
session.add(hero_5)
session.add(hero_6)
session.add(hero_7)
session.commit()
# Code below omitted 👇
👀 完整文件预览
from sqlmodel import Field, Session, SQLModel, create_engine, select
class Hero(SQLModel, table=True):
id: int | None = Field(default=None, primary_key=True)
name: str = Field(index=True)
secret_name: str
age: int | None = Field(default=None, index=True)
sqlite_file_name = "database.db"
sqlite_url = f"sqlite:///{sqlite_file_name}"
engine = create_engine(sqlite_url, echo=True)
def create_db_and_tables():
SQLModel.metadata.create_all(engine)
def create_heroes():
hero_1 = Hero(name="Deadpond", secret_name="Dive Wilson")
hero_2 = Hero(name="Spider-Boy", secret_name="Pedro Parqueador")
hero_3 = Hero(name="Rusty-Man", secret_name="Tommy Sharp", age=48)
hero_4 = Hero(name="Tarantula", secret_name="Natalia Roman-on", age=32)
hero_5 = Hero(name="Black Lion", secret_name="Trevor Challa", age=35)
hero_6 = Hero(name="Dr. Weird", secret_name="Steve Weird", age=36)
hero_7 = Hero(name="Captain North America", secret_name="Esteban Rogelios", age=93)
with Session(engine) as session:
session.add(hero_1)
session.add(hero_2)
session.add(hero_3)
session.add(hero_4)
session.add(hero_5)
session.add(hero_6)
session.add(hero_7)
session.commit()
def select_heroes():
with Session(engine) as session:
statement = select(Hero).limit(3)
results = session.exec(statement)
heroes = results.all()
print(heroes)
def main():
create_db_and_tables()
create_heroes()
select_heroes()
if __name__ == "__main__":
main()
🤓 其他版本和变体
from typing import Optional
from sqlmodel import Field, Session, SQLModel, create_engine, select
class Hero(SQLModel, table=True):
id: Optional[int] = Field(default=None, primary_key=True)
name: str = Field(index=True)
secret_name: str
age: Optional[int] = Field(default=None, index=True)
sqlite_file_name = "database.db"
sqlite_url = f"sqlite:///{sqlite_file_name}"
engine = create_engine(sqlite_url, echo=True)
def create_db_and_tables():
SQLModel.metadata.create_all(engine)
def create_heroes():
hero_1 = Hero(name="Deadpond", secret_name="Dive Wilson")
hero_2 = Hero(name="Spider-Boy", secret_name="Pedro Parqueador")
hero_3 = Hero(name="Rusty-Man", secret_name="Tommy Sharp", age=48)
hero_4 = Hero(name="Tarantula", secret_name="Natalia Roman-on", age=32)
hero_5 = Hero(name="Black Lion", secret_name="Trevor Challa", age=35)
hero_6 = Hero(name="Dr. Weird", secret_name="Steve Weird", age=36)
hero_7 = Hero(name="Captain North America", secret_name="Esteban Rogelios", age=93)
with Session(engine) as session:
session.add(hero_1)
session.add(hero_2)
session.add(hero_3)
session.add(hero_4)
session.add(hero_5)
session.add(hero_6)
session.add(hero_7)
session.commit()
def select_heroes():
with Session(engine) as session:
statement = select(Hero).limit(3)
results = session.exec(statement)
heroes = results.all()
print(heroes)
def main():
create_db_and_tables()
create_heroes()
select_heroes()
if __name__ == "__main__":
main()
回顾全选¶
这是我们在 select() 示例中选择所有英雄的代码
# Code above omitted 👆
def select_heroes():
with Session(engine) as session:
statement = select(Hero)
results = session.exec(statement)
heroes = results.all()
print(heroes)
# Code below omitted 👇
👀 完整文件预览
from sqlmodel import Field, Session, SQLModel, create_engine, select
class Hero(SQLModel, table=True):
id: int | None = Field(default=None, primary_key=True)
name: str
secret_name: str
age: int | None = None
sqlite_file_name = "database.db"
sqlite_url = f"sqlite:///{sqlite_file_name}"
engine = create_engine(sqlite_url, echo=True)
def create_db_and_tables():
SQLModel.metadata.create_all(engine)
def create_heroes():
hero_1 = Hero(name="Deadpond", secret_name="Dive Wilson")
hero_2 = Hero(name="Spider-Boy", secret_name="Pedro Parqueador")
hero_3 = Hero(name="Rusty-Man", secret_name="Tommy Sharp", age=48)
with Session(engine) as session:
session.add(hero_1)
session.add(hero_2)
session.add(hero_3)
session.commit()
def select_heroes():
with Session(engine) as session:
statement = select(Hero)
results = session.exec(statement)
heroes = results.all()
print(heroes)
def main():
create_db_and_tables()
create_heroes()
select_heroes()
if __name__ == "__main__":
main()
🤓 其他版本和变体
from typing import Optional
from sqlmodel import Field, Session, SQLModel, create_engine, select
class Hero(SQLModel, table=True):
id: Optional[int] = Field(default=None, primary_key=True)
name: str
secret_name: str
age: Optional[int] = None
sqlite_file_name = "database.db"
sqlite_url = f"sqlite:///{sqlite_file_name}"
engine = create_engine(sqlite_url, echo=True)
def create_db_and_tables():
SQLModel.metadata.create_all(engine)
def create_heroes():
hero_1 = Hero(name="Deadpond", secret_name="Dive Wilson")
hero_2 = Hero(name="Spider-Boy", secret_name="Pedro Parqueador")
hero_3 = Hero(name="Rusty-Man", secret_name="Tommy Sharp", age=48)
with Session(engine) as session:
session.add(hero_1)
session.add(hero_2)
session.add(hero_3)
session.commit()
def select_heroes():
with Session(engine) as session:
statement = select(Hero)
results = session.exec(statement)
heroes = results.all()
print(heroes)
def main():
create_db_and_tables()
create_heroes()
select_heroes()
if __name__ == "__main__":
main()
但这会一次性获取**所有**英雄,在一个可能有数千个英雄的数据库中,这可能会有问题。
带 LIMIT 的选择¶
我们目前数据库中有 7 个英雄。但我们也可能有很多,比如数千个,所以我们来限制结果,只获取前 3 个。
# Code above omitted 👆
def select_heroes():
with Session(engine) as session:
statement = select(Hero).limit(3)
results = session.exec(statement)
heroes = results.all()
print(heroes)
# Code below omitted 👇
👀 完整文件预览
from sqlmodel import Field, Session, SQLModel, create_engine, select
class Hero(SQLModel, table=True):
id: int | None = Field(default=None, primary_key=True)
name: str = Field(index=True)
secret_name: str
age: int | None = Field(default=None, index=True)
sqlite_file_name = "database.db"
sqlite_url = f"sqlite:///{sqlite_file_name}"
engine = create_engine(sqlite_url, echo=True)
def create_db_and_tables():
SQLModel.metadata.create_all(engine)
def create_heroes():
hero_1 = Hero(name="Deadpond", secret_name="Dive Wilson")
hero_2 = Hero(name="Spider-Boy", secret_name="Pedro Parqueador")
hero_3 = Hero(name="Rusty-Man", secret_name="Tommy Sharp", age=48)
hero_4 = Hero(name="Tarantula", secret_name="Natalia Roman-on", age=32)
hero_5 = Hero(name="Black Lion", secret_name="Trevor Challa", age=35)
hero_6 = Hero(name="Dr. Weird", secret_name="Steve Weird", age=36)
hero_7 = Hero(name="Captain North America", secret_name="Esteban Rogelios", age=93)
with Session(engine) as session:
session.add(hero_1)
session.add(hero_2)
session.add(hero_3)
session.add(hero_4)
session.add(hero_5)
session.add(hero_6)
session.add(hero_7)
session.commit()
def select_heroes():
with Session(engine) as session:
statement = select(Hero).limit(3)
results = session.exec(statement)
heroes = results.all()
print(heroes)
def main():
create_db_and_tables()
create_heroes()
select_heroes()
if __name__ == "__main__":
main()
🤓 其他版本和变体
from typing import Optional
from sqlmodel import Field, Session, SQLModel, create_engine, select
class Hero(SQLModel, table=True):
id: Optional[int] = Field(default=None, primary_key=True)
name: str = Field(index=True)
secret_name: str
age: Optional[int] = Field(default=None, index=True)
sqlite_file_name = "database.db"
sqlite_url = f"sqlite:///{sqlite_file_name}"
engine = create_engine(sqlite_url, echo=True)
def create_db_and_tables():
SQLModel.metadata.create_all(engine)
def create_heroes():
hero_1 = Hero(name="Deadpond", secret_name="Dive Wilson")
hero_2 = Hero(name="Spider-Boy", secret_name="Pedro Parqueador")
hero_3 = Hero(name="Rusty-Man", secret_name="Tommy Sharp", age=48)
hero_4 = Hero(name="Tarantula", secret_name="Natalia Roman-on", age=32)
hero_5 = Hero(name="Black Lion", secret_name="Trevor Challa", age=35)
hero_6 = Hero(name="Dr. Weird", secret_name="Steve Weird", age=36)
hero_7 = Hero(name="Captain North America", secret_name="Esteban Rogelios", age=93)
with Session(engine) as session:
session.add(hero_1)
session.add(hero_2)
session.add(hero_3)
session.add(hero_4)
session.add(hero_5)
session.add(hero_6)
session.add(hero_7)
session.commit()
def select_heroes():
with Session(engine) as session:
statement = select(Hero).limit(3)
results = session.exec(statement)
heroes = results.all()
print(heroes)
def main():
create_db_and_tables()
create_heroes()
select_heroes()
if __name__ == "__main__":
main()
我们从 select() 获得的特殊 **select** 对象也有一个 .limit() 方法,我们可以用它来限制结果的数量。
在这种情况下,我们不再获取所有 7 行,而是将其限制为只获取前 3 行。
在命令行上运行程序¶
如果我们在命令行上运行它,它将输出
$ python app.py
// Previous output omitted 🙈
// Select with LIMIT
INFO Engine SELECT hero.id, hero.name, hero.secret_name, hero.age
FROM hero
LIMIT ? OFFSET ?
INFO Engine [no key 0.00014s] (3, 0)
// Print the heroes received, only 3
[
Hero(age=None, secret_name='Dive Wilson', id=1, name='Deadpond'),
Hero(age=None, secret_name='Pedro Parqueador', id=2, name='Spider-Boy'),
Hero(age=48, secret_name='Tommy Sharp', id=3, name='Rusty-Man')
]
太棒了!我们只得到了我们想要的 3 个英雄。
提示
我们稍后会再检查一下那个 SQL 代码。
带 OFFSET 和 LIMIT 的选择¶
现在我们可以限制结果,只获取前 3 个。
但想象一下,我们正在一个用户界面中,每次显示 3 个英雄的结果。
提示
这通常被称为“分页”。因为用户界面通常会一次显示一个预定义数量英雄的“页面”。
然后你可以与用户界面交互以获取下一页,依此类推。
我们如何获取接下来的 3 个?
我们可以使用 .offset()
# Code above omitted 👆
def select_heroes():
with Session(engine) as session:
statement = select(Hero).offset(3).limit(3)
results = session.exec(statement)
heroes = results.all()
print(heroes)
# Code below omitted 👇
👀 完整文件预览
from sqlmodel import Field, Session, SQLModel, create_engine, select
class Hero(SQLModel, table=True):
id: int | None = Field(default=None, primary_key=True)
name: str = Field(index=True)
secret_name: str
age: int | None = Field(default=None, index=True)
sqlite_file_name = "database.db"
sqlite_url = f"sqlite:///{sqlite_file_name}"
engine = create_engine(sqlite_url, echo=True)
def create_db_and_tables():
SQLModel.metadata.create_all(engine)
def create_heroes():
hero_1 = Hero(name="Deadpond", secret_name="Dive Wilson")
hero_2 = Hero(name="Spider-Boy", secret_name="Pedro Parqueador")
hero_3 = Hero(name="Rusty-Man", secret_name="Tommy Sharp", age=48)
hero_4 = Hero(name="Tarantula", secret_name="Natalia Roman-on", age=32)
hero_5 = Hero(name="Black Lion", secret_name="Trevor Challa", age=35)
hero_6 = Hero(name="Dr. Weird", secret_name="Steve Weird", age=36)
hero_7 = Hero(name="Captain North America", secret_name="Esteban Rogelios", age=93)
with Session(engine) as session:
session.add(hero_1)
session.add(hero_2)
session.add(hero_3)
session.add(hero_4)
session.add(hero_5)
session.add(hero_6)
session.add(hero_7)
session.commit()
def select_heroes():
with Session(engine) as session:
statement = select(Hero).offset(3).limit(3)
results = session.exec(statement)
heroes = results.all()
print(heroes)
def main():
create_db_and_tables()
create_heroes()
select_heroes()
if __name__ == "__main__":
main()
🤓 其他版本和变体
from typing import Optional
from sqlmodel import Field, Session, SQLModel, create_engine, select
class Hero(SQLModel, table=True):
id: Optional[int] = Field(default=None, primary_key=True)
name: str = Field(index=True)
secret_name: str
age: Optional[int] = Field(default=None, index=True)
sqlite_file_name = "database.db"
sqlite_url = f"sqlite:///{sqlite_file_name}"
engine = create_engine(sqlite_url, echo=True)
def create_db_and_tables():
SQLModel.metadata.create_all(engine)
def create_heroes():
hero_1 = Hero(name="Deadpond", secret_name="Dive Wilson")
hero_2 = Hero(name="Spider-Boy", secret_name="Pedro Parqueador")
hero_3 = Hero(name="Rusty-Man", secret_name="Tommy Sharp", age=48)
hero_4 = Hero(name="Tarantula", secret_name="Natalia Roman-on", age=32)
hero_5 = Hero(name="Black Lion", secret_name="Trevor Challa", age=35)
hero_6 = Hero(name="Dr. Weird", secret_name="Steve Weird", age=36)
hero_7 = Hero(name="Captain North America", secret_name="Esteban Rogelios", age=93)
with Session(engine) as session:
session.add(hero_1)
session.add(hero_2)
session.add(hero_3)
session.add(hero_4)
session.add(hero_5)
session.add(hero_6)
session.add(hero_7)
session.commit()
def select_heroes():
with Session(engine) as session:
statement = select(Hero).offset(3).limit(3)
results = session.exec(statement)
heroes = results.all()
print(heroes)
def main():
create_db_and_tables()
create_heroes()
select_heroes()
if __name__ == "__main__":
main()
它的工作方式是,我们从 select() 获取的特殊 **select** 对象具有像 .where()、.offset() 和 .limit() 这样的方法。
这些方法中的每一个都将更改应用于内部的特殊 select 语句对象,并且还**返回相同的对象**,这样我们就可以继续在其上使用更多方法,就像上面示例中我们同时使用了 .offset() 和 .limit()。
**Offset** 意味着“跳过这么多行”,由于我们想跳过我们已经看过的那些,即前三行,所以我们使用 .offset(3)。
在命令行上使用 OFFSET 运行程序¶
现在我们可以在命令行上运行程序,它将输出
$python app.py
// Previous output omitted 🙈
// Select with LIMIT and OFFSET
INFO Engine SELECT hero.id, hero.name, hero.secret_name, hero.age
FROM hero
LIMIT ? OFFSET ?
INFO Engine [no key 0.00020s] (3, 3)
// Print the 3 heroes received, the second batch
[
Hero(age=32, secret_name='Natalia Roman-on', id=4, name='Tarantula'),
Hero(age=35, secret_name='Trevor Challa', id=5, name='Black Lion'),
Hero(age=36, secret_name='Steve Weird', id=6, name='Dr. Weird')
]
选择下一批¶
然后要获取下一批 3 行,我们将跳过我们已经看到的所有行,即前 6 行
# Code above omitted 👆
def select_heroes():
with Session(engine) as session:
statement = select(Hero).offset(6).limit(3)
results = session.exec(statement)
heroes = results.all()
print(heroes)
# Code below omitted 👇
👀 完整文件预览
from sqlmodel import Field, Session, SQLModel, create_engine, select
class Hero(SQLModel, table=True):
id: int | None = Field(default=None, primary_key=True)
name: str = Field(index=True)
secret_name: str
age: int | None = Field(default=None, index=True)
sqlite_file_name = "database.db"
sqlite_url = f"sqlite:///{sqlite_file_name}"
engine = create_engine(sqlite_url, echo=True)
def create_db_and_tables():
SQLModel.metadata.create_all(engine)
def create_heroes():
hero_1 = Hero(name="Deadpond", secret_name="Dive Wilson")
hero_2 = Hero(name="Spider-Boy", secret_name="Pedro Parqueador")
hero_3 = Hero(name="Rusty-Man", secret_name="Tommy Sharp", age=48)
hero_4 = Hero(name="Tarantula", secret_name="Natalia Roman-on", age=32)
hero_5 = Hero(name="Black Lion", secret_name="Trevor Challa", age=35)
hero_6 = Hero(name="Dr. Weird", secret_name="Steve Weird", age=36)
hero_7 = Hero(name="Captain North America", secret_name="Esteban Rogelios", age=93)
with Session(engine) as session:
session.add(hero_1)
session.add(hero_2)
session.add(hero_3)
session.add(hero_4)
session.add(hero_5)
session.add(hero_6)
session.add(hero_7)
session.commit()
def select_heroes():
with Session(engine) as session:
statement = select(Hero).offset(6).limit(3)
results = session.exec(statement)
heroes = results.all()
print(heroes)
def main():
create_db_and_tables()
create_heroes()
select_heroes()
if __name__ == "__main__":
main()
🤓 其他版本和变体
from typing import Optional
from sqlmodel import Field, Session, SQLModel, create_engine, select
class Hero(SQLModel, table=True):
id: Optional[int] = Field(default=None, primary_key=True)
name: str = Field(index=True)
secret_name: str
age: Optional[int] = Field(default=None, index=True)
sqlite_file_name = "database.db"
sqlite_url = f"sqlite:///{sqlite_file_name}"
engine = create_engine(sqlite_url, echo=True)
def create_db_and_tables():
SQLModel.metadata.create_all(engine)
def create_heroes():
hero_1 = Hero(name="Deadpond", secret_name="Dive Wilson")
hero_2 = Hero(name="Spider-Boy", secret_name="Pedro Parqueador")
hero_3 = Hero(name="Rusty-Man", secret_name="Tommy Sharp", age=48)
hero_4 = Hero(name="Tarantula", secret_name="Natalia Roman-on", age=32)
hero_5 = Hero(name="Black Lion", secret_name="Trevor Challa", age=35)
hero_6 = Hero(name="Dr. Weird", secret_name="Steve Weird", age=36)
hero_7 = Hero(name="Captain North America", secret_name="Esteban Rogelios", age=93)
with Session(engine) as session:
session.add(hero_1)
session.add(hero_2)
session.add(hero_3)
session.add(hero_4)
session.add(hero_5)
session.add(hero_6)
session.add(hero_7)
session.commit()
def select_heroes():
with Session(engine) as session:
statement = select(Hero).offset(6).limit(3)
results = session.exec(statement)
heroes = results.all()
print(heroes)
def main():
create_db_and_tables()
create_heroes()
select_heroes()
if __name__ == "__main__":
main()
数据库目前**只有 7 行**,所以这个查询只能得到 1 行。
但别担心,数据库不会在只有一行的情况下试图获取 3 行时抛出错误(就像 Python 列表会发生的那样)。
数据库知道我们想要**限制**结果的数量,但它不一定需要找到那么多结果。
在命令行上使用最后一批运行程序¶
如果我们在命令行中运行它,它将输出
$ python app.py
// Previous output omitted 🙈
// Select last batch with LIMIT and OFFSET
INFO Engine SELECT hero.id, hero.name, hero.secret_name, hero.age
FROM hero
LIMIT ? OFFSET ?
INFO Engine [no key 0.00038s] (3, 6)
// Print last batch of heroes, only one
[
Hero(age=93, secret_name='Esteban Rogelios', id=7, name='Captain North America')
]
带 LIMIT 和 OFFSET 的 SQL¶
你可能已经注意到新的 SQL 关键字 LIMIT 和 OFFSET。
你可以在 SQL 中,在其他部分的末尾使用它们
SELECT id, name, secret_name, age
FROM hero
LIMIT 3 OFFSET 6
如果你在 **DB Browser for SQLite** 中尝试,你将得到相同的结果

将 LIMIT 和 OFFSET 与 WHERE 结合使用¶
当然,你也可以将 .limit() 和 .offset() 与 .where() 以及你稍后将学习的其他方法结合使用
# Code above omitted 👆
def select_heroes():
with Session(engine) as session:
statement = select(Hero).where(Hero.age > 32).offset(1).limit(2)
results = session.exec(statement)
heroes = results.all()
print(heroes)
# Code below omitted 👇
👀 完整文件预览
from sqlmodel import Field, Session, SQLModel, create_engine, select
class Hero(SQLModel, table=True):
id: int | None = Field(default=None, primary_key=True)
name: str = Field(index=True)
secret_name: str
age: int | None = Field(default=None, index=True)
sqlite_file_name = "database.db"
sqlite_url = f"sqlite:///{sqlite_file_name}"
engine = create_engine(sqlite_url, echo=True)
def create_db_and_tables():
SQLModel.metadata.create_all(engine)
def create_heroes():
hero_1 = Hero(name="Deadpond", secret_name="Dive Wilson")
hero_2 = Hero(name="Spider-Boy", secret_name="Pedro Parqueador")
hero_3 = Hero(name="Rusty-Man", secret_name="Tommy Sharp", age=48)
hero_4 = Hero(name="Tarantula", secret_name="Natalia Roman-on", age=32)
hero_5 = Hero(name="Black Lion", secret_name="Trevor Challa", age=35)
hero_6 = Hero(name="Dr. Weird", secret_name="Steve Weird", age=36)
hero_7 = Hero(name="Captain North America", secret_name="Esteban Rogelios", age=93)
with Session(engine) as session:
session.add(hero_1)
session.add(hero_2)
session.add(hero_3)
session.add(hero_4)
session.add(hero_5)
session.add(hero_6)
session.add(hero_7)
session.commit()
def select_heroes():
with Session(engine) as session:
statement = select(Hero).where(Hero.age > 32).offset(1).limit(2)
results = session.exec(statement)
heroes = results.all()
print(heroes)
def main():
create_db_and_tables()
create_heroes()
select_heroes()
if __name__ == "__main__":
main()
🤓 其他版本和变体
from typing import Optional
from sqlmodel import Field, Session, SQLModel, create_engine, select
class Hero(SQLModel, table=True):
id: Optional[int] = Field(default=None, primary_key=True)
name: str = Field(index=True)
secret_name: str
age: Optional[int] = Field(default=None, index=True)
sqlite_file_name = "database.db"
sqlite_url = f"sqlite:///{sqlite_file_name}"
engine = create_engine(sqlite_url, echo=True)
def create_db_and_tables():
SQLModel.metadata.create_all(engine)
def create_heroes():
hero_1 = Hero(name="Deadpond", secret_name="Dive Wilson")
hero_2 = Hero(name="Spider-Boy", secret_name="Pedro Parqueador")
hero_3 = Hero(name="Rusty-Man", secret_name="Tommy Sharp", age=48)
hero_4 = Hero(name="Tarantula", secret_name="Natalia Roman-on", age=32)
hero_5 = Hero(name="Black Lion", secret_name="Trevor Challa", age=35)
hero_6 = Hero(name="Dr. Weird", secret_name="Steve Weird", age=36)
hero_7 = Hero(name="Captain North America", secret_name="Esteban Rogelios", age=93)
with Session(engine) as session:
session.add(hero_1)
session.add(hero_2)
session.add(hero_3)
session.add(hero_4)
session.add(hero_5)
session.add(hero_6)
session.add(hero_7)
session.commit()
def select_heroes():
with Session(engine) as session:
statement = select(Hero).where(Hero.age > 32).offset(1).limit(2)
results = session.exec(statement)
heroes = results.all()
print(heroes)
def main():
create_db_and_tables()
create_heroes()
select_heroes()
if __name__ == "__main__":
main()
在命令行上使用 LIMIT、OFFSET 和 WHERE 运行程序¶
如果我们在命令行上运行它,它将找到数据库中所有年龄大于 32 的英雄。这通常会有 4 个英雄。
但我们从偏移 1 后开始(所以我们不计算第一个),并且我们将结果限制为只获取之后的 2 个。
$ python app.py
// Previous output omitted 🙈
// Select with WHERE and LIMIT and OFFSET
INFO Engine SELECT hero.id, hero.name, hero.secret_name, hero.age
FROM hero
WHERE hero.age > ?
LIMIT ? OFFSET ?
INFO Engine [no key 0.00022s] (32, 2, 1)
// Print the heroes received, only 2
[
Hero(age=36, id=6, name='Dr. Weird', secret_name='Steve Weird'),
Hero(age=48, id=3, name='Rusty-Man', secret_name='Tommy Sharp')
]
回顾¶
无论你如何使用 .where() 或其他方法过滤数据,你都可以使用 .limit() 将查询限制为最多获取某个数量的结果。
同样,你可以使用 .offset() 跳过前几个结果。