You've seen the "Hello World" FastAPI tutorials. This isn't one of those. This is the stack I use when I need something that won't wake me up at 3 AM — async PostgreSQL, proper migrations, containerized from day one.
The Stack
- FastAPI — async Python, automatic OpenAPI docs
- PostgreSQL 15 — battle-tested, row-level security, JSONB
- async SQLAlchemy 2.0 — modern async ORM patterns
- Alembic — versioned schema migrations
- Docker Compose — local parity with production
Project Structure
myapp/
├── app/
│ ├── main.py
│ ├── database.py
│ ├── models.py
│ ├── schemas.py
│ └── routers/
│ └── users.py
├── alembic/
│ └── versions/
├── alembic.ini
├── docker-compose.yml
└── requirements.txtDatabase Layer
The single most important file is database.py. Get this wrong and everything downstream breaks:
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import DeclarativeBase, sessionmaker
DATABASE_URL = "postgresql+asyncpg://user:password@db/myapp"
engine = create_async_engine(DATABASE_URL, pool_size=10, max_overflow=20)
AsyncSessionLocal = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
class Base(DeclarativeBase):
pass
async def get_db():
async with AsyncSessionLocal() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raiseThe expire_on_commit=False is load-bearing. Without it, accessing any attribute after commit raises MissingGreenlet in async context.
Your First Model
from sqlalchemy import String, DateTime, func
from sqlalchemy.orm import Mapped, mapped_column
from .database import Base
class User(Base):
__tablename__ = "users"
id: Mapped[int] = mapped_column(primary_key=True)
email: Mapped[str] = mapped_column(String(255), unique=True, index=True)
created_at: Mapped[datetime] = mapped_column(DateTime, server_default=func.now())SQLAlchemy 2.0 Mapped types give you actual type inference — your IDE knows what user.email returns.
Running Migrations
# Initialize Alembic
alembic init alembic
# In alembic/env.py, import your Base and models:
from app.database import Base
from app import models # noqa: F401 — needed to register models
target_metadata = Base.metadata
# Generate migration
alembic revision --autogenerate -m "create users table"
# Apply
alembic upgrade headAlways inspect the generated migration before applying. Autogenerate misses things like PostgreSQL-specific indexes and check constraints.
Docker Compose
version: "3.9"
services:
db:
image: postgres:15-alpine
environment:
POSTGRES_USER: user
POSTGRES_PASSWORD: password
POSTGRES_DB: myapp
volumes:
- pgdata:/var/lib/postgresql/data
healthcheck:
test: ["CMD-SHELL", "pg_isready -U user -d myapp"]
interval: 5s
timeout: 5s
retries: 5
app:
build: .
ports:
- "8000:8000"
depends_on:
db:
condition: service_healthy
environment:
DATABASE_URL: postgresql+asyncpg://user:password@db/myapp
command: uvicorn app.main:app --host 0.0.0.0 --port 8000
volumes:
pgdata:The service_healthy condition is critical. Without it, the app starts before Postgres is ready and crashes on first query. Spent an embarrassing amount of time debugging this the first time.
Connection Pooling in Production
The defaults are too conservative for production load. For a typical API handling 100 concurrent requests:
engine = create_async_engine(
DATABASE_URL,
pool_size=20, # workers that can hold a connection
max_overflow=40, # burst capacity
pool_pre_ping=True, # validates connections before use
pool_recycle=3600, # recycle connections after 1 hour
)Set pool_size to match your worker count. max_overflow handles traffic spikes. pool_pre_ping=True is the difference between occasional 500s and clean failover after a database restart.
One Pattern That Changed Everything
Repository pattern sounds like enterprise overhead until you're 6 months in and want to test your business logic without a live database. Keep it simple — a module per model, functions that take a session, return domain objects:
async def get_user_by_email(db: AsyncSession, email: str) -> User | None:
result = await db.execute(select(User).where(User.email == email))
return result.scalar_one_or_none()
async def create_user(db: AsyncSession, email: str) -> User:
user = User(email=email)
db.add(user)
await db.flush() # assign ID without committing
return userThe session commit happens in get_db(), not here. This keeps your data functions composable — you can call multiple creates in one request without worrying about transaction boundaries.
Ship it. Iterate. The stack scales to millions of rows before you need to think about anything else.