Async SQLAlchemy with FastAPI
SQLAlchemy 1.4
Before we look at the example, there are some important information about the new SQLAlchemy 1.4 release:
- SQLAlchemy 1.4 presents changes that will be finalized in SQLAlchemy 2.0.
- SQLAlchemy unifies Core and ORM APIs for consistency.
- Both Core and ORM now support async with asyncio, but this feature is not production ready yet. It also comes with some limitations on what can we do with the ORM, notably in respect to lazy loading.
- There is a new way to create queries, called 2.0 style, since it will be the style of the next major version of SQLAlchemy. Spoiler alert: the syntax is basically the same, but the query is not called on the session object directly.
- There is now support for declarative mapping with dataclasses and attrs.
In this post I will use the new async capabilities of the ORM layer, together with the new 2.0 style queries. We will create a simple FastAPI application with two routes. One for adding cities and their population, and another that will list the most populated entries.
Prerequisites
The example assumes Python 3.9 and SQLAlchemy 1.4. Other dependencies include FastAPI with uvicorn, asyncpg (PostgreSQL database client for Python's asyncio) and typer for creating the table structure from the command line.
The async support can be used only with PostgreSQL database at the moment.
The code for this article can be found in the stribny/fastapi-asyncalchemy repo.
Model
There are new ways how to declare SQLAlchemy models, however I will use the subclass declarative style that still works well and is less verbose.
Our City
model in models.py
will have id
, name
and population
fields:
from sqlalchemy import Column
from sqlalchemy import String
from sqlalchemy import Integer
from fastapi_asyncalchemy.db.base import Base
class City(Base):
__tablename__ = "cities"
id = Column(Integer, autoincrement=True, primary_key=True, index=True)
name = Column(String, unique=True)
population = Column(Integer)
Async engine and model initialization
In base.py
we will initialize the new SQLAlchemy engine with create_async_engine()
and create an async session maker by passing it the new AsyncSession
class:
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy.orm import sessionmaker
DATABASE_URL = "postgresql+asyncpg://postgres:postgres@localhost/asyncalchemy"
engine = create_async_engine(DATABASE_URL, echo=True)
Base = declarative_base()
async_session = sessionmaker(
engine, class_=AsyncSession, expire_on_commit=False
)
Specifying echo=True
upon the engine initialization will enable us to see generated SQL queries in the console. We should disable the "expire on commit" behavior of sessions with expire_on_commit=False
. This is because in async settings, we don't want SQLAlchemy to issue new SQL queries to the database when accessing already commited objects.
Let's also define an async function to clear and recreate the database table that we will use later:
async def init_models():
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.drop_all)
await conn.run_sync(Base.metadata.create_all)
Dropping and creating tables from Base.metadata
doesn't run async by default and there is generally no reason for us to call it within an async function. This is just an example that shows how SQLAlchemy can run otherwise sync operations with run_sync()
.
Async session object
Finally, we will create a FastAPI Dependency function to create future sessions for us on demand:
# Dependency
async def get_session() -> AsyncSession:
async with async_session() as session:
yield session
With FastAPI's dependency system we can later use this function to inject new sessions to our routes.
Service layer and the new query style
Let's now define two service functions in service.py
: one for adding and another for retrieving cities.
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from fastapi_asyncalchemy.models import *
async def get_biggest_cities(session: AsyncSession) -> list[City]:
result = await session.execute(select(City).order_by(City.population.desc()).limit(20))
return result.scalars().all()
def add_city(session: AsyncSession, name: str, population: int):
new_city = City(name=name, population=population)
session.add(new_city)
return new_city
The get_biggest_cities
uses the new select
function to create the query. The query looks the same as it would be run directly on the session object. But now, instead of calling session().query()
, we await session.execute()
that will execute the query and hold the results. The scalars()
method provides access to the results.
The function add_city
just puts a new City
object to the session - we will manage the transaction in the controller (route).
Command line
Before we look at the routes, we need to create our table. We will simply define a function in main.py
that will run init_models()
on the event loop. I used Typer to create the CLI command, although when only one command is defined, its execution is reduced to invoking python main.py
without additional arguments.
import asyncio
import typer
cli = typer.Typer()
@cli.command()
def db_init_models():
asyncio.run(init_models())
print("Done")
if __name__ == "__main__":
cli()
Routes
Let's now see the FastAPI routes (with CitySchema
to represent the JSON exchanged):
from fastapi import FastAPI
from fastapi import Depends
from sqlalchemy.ext.asyncio import AsyncSession
from pydantic import BaseModel
from sqlalchemy.exc import IntegrityError
from fastapi_asyncalchemy.exceptions import DuplicatedEntryError
from fastapi_asyncalchemy.db.base import init_models
from fastapi_asyncalchemy.db.base import get_session
from fastapi_asyncalchemy import service
app = FastAPI()
class CitySchema(BaseModel):
name: str
population: int
@app.get("/cities/biggest", response_model=list[CitySchema])
async def get_biggest_cities(session: AsyncSession = Depends(get_session)):
cities = await service.get_biggest_cities(session)
return [CitySchema(name=c.name, population=c.population) for c in cities]
@app.post("/cities/")
async def add_city(city: CitySchema, session: AsyncSession = Depends(get_session)):
city = service.add_city(session, city.name, city.population)
try:
await session.commit()
return city
except IntegrityError as ex:
await session.rollback()
raise DuplicatedEntryError("The city is already stored")
We can see that a session can be injected with Depends
. So a call to each of the routes will create a new session. For retrieving data, the only change is that now we want to await
our service function.
To demonstrate an error situation, I am catching an integrity error from the unique constraint defined earlier. We simply await session.commit()
or await session.rollback()
.
Final words
I am not an expert on asynchronous usage of SQLAlchemy, I just wanted to see what it would take to convert a standard FastAPI application with SQLAlchemy to the async world. Let me know if there is anything to add or change.
Last updated on 31.3.2021.