1. 安装依赖
pip install sqlalchemy[asyncio] asyncpg
2. 建立数据库连接和 Session
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import sessionmaker, declarative_base
DATABASE_URL = "postgresql+asyncpg://user:password@localhost/dbname"
engine = create_async_engine(DATABASE_URL, echo=True)
async_session = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
Base = declarative_base()
async def get_async_session():
async with async_session() as session:
yield session
3. 定义模型(只写你需要的字段)
from sqlalchemy import Column, Integer, String
class Comment(Base):
__tablename__ = "comments"
id = Column(Integer, primary_key=True, index=True)
content = Column(String)
4. 创建表(只需执行一次)
import asyncio
async def init_models():
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
asyncio.run(init_models())
5. 常用操作示例
增
async def create_comment(session: AsyncSession, content: str):
comment = Comment(content=content)
session.add(comment)
await session.commit()
await session.refresh(comment)
return comment
查全部
from sqlalchemy.future import select
async def list_comments(session: AsyncSession):
result = await session.execute(select(Comment))
return result.scalars().all()
改
async def update_comment(session: AsyncSession, comment_id: int, new_content: str):
result = await session.execute(select(Comment).where(Comment.id == comment_id))
comment = result.scalars().first()
if comment:
comment.content = new_content
await session.commit()
await session.refresh(comment)
return comment
删
async def delete_comment(session: AsyncSession, comment_id: int):
result = await session.execute(select(Comment).where(Comment.id == comment_id))
comment = result.scalars().first()
if comment:
await session.delete(comment)
await session.commit()
return True
return False
6. 小贴士
await session.commit()
提交事务await session.refresh(obj)
刷新,获取数据库更新字段result.scalars()
获取 ORM 对象列表或单个对象
这就是最小可用的核心,按需用,别着急,慢慢熟悉。