Async SQLAlchemy with FastAPI

fastapi python sqlalchemy

SQLAlchemy 1.4

Before we look at the example, there are some important information about the new SQLAlchemy 1.4 release:

In this post I will use the new async capabilities of the ORM layer, together with the new 2.0 style queries. We will create a simple FastAPI application with two routes. One for adding cities and their population, and another that will list the most populated entries.

Prerequisites

The example assumes Python 3.9 and SQLAlchemy 1.4. Other dependencies include FastAPI with uvicorn, asyncpg (PostgreSQL database client for Python's asyncio) and typer for creating the table structure from the command line.

The async support can be used only with PostgreSQL database at the moment.

The code for this article can be found in the stribny/fastapi-asyncalchemy repo.

Model

There are new ways how to declare SQLAlchemy models, however I will use the subclass declarative style that still works well and is less verbose.

Our City model in models.py will have id, name and population fields:

from sqlalchemy import Column
from sqlalchemy import String
from sqlalchemy import Integer
from fastapi_asyncalchemy.db.base import Base


class City(Base):
__tablename__ = "cities"

id = Column(Integer, autoincrement=True, primary_key=True, index=True)
name = Column(String, unique=True)
population = Column(Integer)

Async engine and model initialization

In base.py we will initialize the new SQLAlchemy engine with create_async_engine() and create an async session maker by passing it the new AsyncSession class:

from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy.orm import sessionmaker


DATABASE_URL = "postgresql+asyncpg://postgres:postgres@localhost/asyncalchemy"


engine = create_async_engine(DATABASE_URL, echo=True)
Base = declarative_base()
async_session = sessionmaker(
engine, class_=AsyncSession, expire_on_commit=False
)

Specifying echo=True upon the engine initialization will enable us to see generated SQL queries in the console. We should disable the "expire on commit" behavior of sessions with expire_on_commit=False. This is because in async settings, we don't want SQLAlchemy to issue new SQL queries to the database when accessing already commited objects.

Let's also define an async function to clear and recreate the database table that we will use later:

async def init_models():
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.drop_all)
await conn.run_sync(Base.metadata.create_all)

Dropping and creating tables from Base.metadata doesn't run async by default and there is generally no reason for us to call it within an async function. This is just an example that shows how SQLAlchemy can run otherwise sync operations with run_sync().

Async session object

Finally, we will create a FastAPI Dependency function to create future sessions for us on demand:

# Dependency
async def get_session() -> AsyncSession:
async with async_session() as session:
yield session

With FastAPI's dependency system we can later use this function to inject new sessions to our routes.

Service layer and the new query style

Let's now define two service functions in service.py: one for adding and another for retrieving cities.

from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from fastapi_asyncalchemy.models import *


async def get_biggest_cities(session: AsyncSession) -> list[City]:
result = await session.execute(select(City).order_by(City.population.desc()).limit(20))
return result.scalars().all()


def add_city(session: AsyncSession, name: str, population: int):
new_city = City(name=name, population=population)
session.add(new_city)
return new_city

The get_biggest_cities uses the new select function to create the query. The query looks the same as it would be run directly on the session object. But now, instead of calling session().query(), we await session.execute() that will execute the query and hold the results. The scalars() method provides access to the results.

The function add_city just puts a new City object to the session - we will manage the transaction in the controller (route).

Command line

Before we look at the routes, we need to create our table. We will simply define a function in main.py that will run init_models() on the event loop. I used Typer to create the CLI command, although when only one command is defined, its execution is reduced to invoking python main.py without additional arguments.

import asyncio
import typer

cli = typer.Typer()

@cli.command()
def db_init_models():
asyncio.run(init_models())
print("Done")


if __name__ == "__main__":
cli()

Routes

Let's now see the FastAPI routes (with CitySchema to represent the JSON exchanged):

from fastapi import FastAPI
from fastapi import Depends
from sqlalchemy.ext.asyncio import AsyncSession
from pydantic import BaseModel
from sqlalchemy.exc import IntegrityError
from fastapi_asyncalchemy.exceptions import DuplicatedEntryError
from fastapi_asyncalchemy.db.base import init_models
from fastapi_asyncalchemy.db.base import get_session
from fastapi_asyncalchemy import service


app = FastAPI()


class CitySchema(BaseModel):
name: str
population: int


@app.get("/cities/biggest", response_model=list[CitySchema])
async def get_biggest_cities(session: AsyncSession = Depends(get_session)):
cities = await service.get_biggest_cities(session)
return [CitySchema(name=c.name, population=c.population) for c in cities]


@app.post("/cities/")
async def add_city(city: CitySchema, session: AsyncSession = Depends(get_session)):
city = service.add_city(session, city.name, city.population)
try:
await session.commit()
return city
except IntegrityError as ex:
await session.rollback()
raise DuplicatedEntryError("The city is already stored")

We can see that a session can be injected with Depends. So a call to each of the routes will create a new session. For retrieving data, the only change is that now we want to await our service function.

To demonstrate an error situation, I am catching an integrity error from the unique constraint defined earlier. We simply await session.commit() or await session.rollback().

Final words

I am not an expert on asynchronous usage of SQLAlchemy, I just wanted to see what it would take to convert a standard FastAPI application with SQLAlchemy to the async world. Let me know if there is anything to add or change.

Last updated on 31.3.2021.