FastAPI from
Absolute Zero
You can read Python. You build Angular APIs. This guide teaches you FastAPI the way a developer thinks โ not a data scientist. Fast, modern, async Python web framework.
- Flask โ micro, minimal, no validation, sync only. Simple scripts, not production APIs
- Django โ full stack with ORM, admin, templates. More than you need for just an API
- FastAPI โ API-first, async, validation built-in, typed, auto-documented. Best for modern APIs
Setup & Project
Structure
Get a production-ready project structure from day one โ not just a single main.py file.
# Create project folder mkdir myapi && cd myapi # Create virtual environment (ALWAYS do this) python -m venv venv # Activate it source venv/bin/activate # Mac/Linux venv\Scripts\activate # Windows # Install FastAPI + Uvicorn (ASGI server) pip install fastapi uvicorn[standard] # Install all common extras upfront pip install sqlalchemy alembic python-jose[cryptography] \ passlib[bcrypt] python-multipart httpx pytest # Save dependencies pip freeze > requirements.txt # Run the server uvicorn main:app --reload # main = filename, app = FastAPI instance
myapi/ โโโ app/ โ โโโ __init__.py โ โโโ main.py # FastAPI app creation + routers โ โโโ config.py # Settings (env vars, secrets) โ โโโ database.py # DB connection + session โ โโโ models/ # SQLAlchemy ORM models โ โ โโโ __init__.py โ โ โโโ user.py โ โโโ schemas/ # Pydantic request/response models โ โ โโโ __init__.py โ โ โโโ user.py โ โโโ routers/ # Route handlers (like Angular modules) โ โ โโโ __init__.py โ โ โโโ auth.py โ โ โโโ users.py โ โโโ services/ # Business logic (like Angular services) โ โ โโโ user_service.py โ โโโ dependencies.py # Shared dependencies (auth, db session) โโโ tests/ โ โโโ __init__.py โ โโโ test_users.py โโโ alembic/ # DB migrations โโโ .env โโโ requirements.txt โโโ Dockerfile
from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware from app.routers import users, auth, products # Create the app โ like Angular's AppModule bootstrap app = FastAPI( title="My API", description="A production FastAPI app", version="1.0.0", docs_url="/docs", # Swagger UI โ visit this in browser! redoc_url="/redoc", # ReDoc alternative docs ) # CORS โ allow your Angular frontend to call this API app.add_middleware( CORSMiddleware, allow_origins=["http://localhost:4200", "https://myapp.com"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Mount routers (like Angular RouterModule.forChild) app.include_router(auth.router, prefix="/auth", tags=["Authentication"]) app.include_router(users.router, prefix="/users", tags=["Users"]) app.include_router(products.router, prefix="/products", tags=["Products"]) # Startup / Shutdown events @app.on_event("startup") async def startup(): print("๐ API starting...") # connect to DB, warm up ML model, etc. @app.on_event("shutdown") async def shutdown(): print("๐ API shutting down...") # Health check endpoint @app.get("/") async def root(): return {"status": "healthy", "version": "1.0.0"}
Routes &
HTTP Methods
Every HTTP method, route organisation, routers and how decorators work.
from fastapi import APIRouter, HTTPException, status from app.schemas.item import ItemCreate, ItemUpdate, ItemResponse # Router = like Angular's RouterModule.forChild() routes array router = APIRouter() # GET โ fetch all (list) @router.get("/", response_model=list[ItemResponse]) async def get_items(): return [...] # return list from DB # GET โ fetch one by ID @router.get("/{item_id}", response_model=ItemResponse) async def get_item(item_id: int): item = db.get(item_id) if not item: raise HTTPException(status_code=404, detail="Item not found") return item # POST โ create new @router.post("/", response_model=ItemResponse, status_code=status.HTTP_201_CREATED) async def create_item(item: ItemCreate): return db.create(item) # PUT โ full update (replace everything) @router.put("/{item_id}", response_model=ItemResponse) async def update_item(item_id: int, item: ItemCreate): return db.replace(item_id, item) # PATCH โ partial update (only change some fields) @router.patch("/{item_id}", response_model=ItemResponse) async def partial_update_item(item_id: int, item: ItemUpdate): return db.update(item_id, item) # DELETE @router.delete("/{item_id}", status_code=status.HTTP_204_NO_CONTENT) async def delete_item(item_id: int): db.delete(item_id) # Return nothing โ 204 No Content
Pydantic Models
& Validation
Pydantic is FastAPI's superpower. Every request body and response is a Pydantic model โ automatically validated, serialized, and documented.
from pydantic import BaseModel, EmailStr, Field, field_validator, model_validator from typing import Optional from datetime import datetime from enum import Enum class RoleEnum(str, Enum): admin = "admin" user = "user" viewer = "viewer" # Request body: what the client SENDS to create a user class UserCreate(BaseModel): name: str = Field(..., min_length=2, max_length=50, example="Ravi Kumar") email: EmailStr # validates it's a real email format password: str = Field(..., min_length=8) age: Optional[int] = Field(None, ge=0, le=120) # ge=gte, le=lte role: RoleEnum = RoleEnum.user # Custom field validator @field_validator("name") @classmethod def name_must_not_have_numbers(cls, v: str) -> str: if any(c.isdigit() for c in v): raise ValueError("Name cannot contain numbers") return v.strip().title() # also transform: "ravi kumar" โ "Ravi Kumar" # Partial update: ALL fields optional (PATCH requests) class UserUpdate(BaseModel): name: Optional[str] = Field(None, min_length=2) email: Optional[EmailStr] = None age: Optional[int] = None # Response: what the server RETURNS (NEVER include password!) class UserResponse(BaseModel): id: int name: str email: str role: RoleEnum created_at: datetime class Config: from_attributes = True # allows creating from SQLAlchemy ORM object # Nested models class AddressCreate(BaseModel): street: str city: str pincode: str = Field(pattern=r"^\d{6}$") # regex validation class UserWithAddressCreate(UserCreate): address: AddressCreate # nested Pydantic model # Cross-field validation class PasswordChange(BaseModel): old_password: str new_password: str confirm_password: str @model_validator(mode="after") def passwords_match(self): if self.new_password != self.confirm_password: raise ValueError("Passwords do not match") return self
Path, Query &
Body Parameters
Three ways data comes into your API. FastAPI knows which is which from type hints alone.
from fastapi import APIRouter, Path, Query, Body, Header, Cookie, File, UploadFile from typing import Annotated, Optional router = APIRouter() # PATH PARAMETER โ /users/{user_id} @router.get("/users/{user_id}") async def get_user( user_id: Annotated[int, Path(title="User ID", ge=1)] ): return {"user_id": user_id} # QUERY PARAMETERS โ /items?skip=0&limit=10&search=phone @router.get("/items") async def list_items( skip: int = 0, # optional, default 0 limit: Annotated[int, Query(le=100)] = 10, # max 100 search: Optional[str] = None, # completely optional active: bool = True # bool: ?active=true or ?active=1 ): return {"skip": skip, "limit": limit, "search": search} # REQUEST BODY โ JSON in request body (POST/PUT) class Item(BaseModel): name: str price: float @router.post("/items") async def create_item(item: Item): # Pydantic model = request body return item # MULTIPLE BODIES โ when you need more than one model @router.post("/order") async def create_order( item: Item, user: UserCreate, # Expects: {"item": {...}, "user": {...}} discount: Annotated[float, Body(ge=0, le=1)] = 0 ): return {"item": item, "user": user} # HEADERS โ read HTTP headers @router.get("/headers") async def read_headers( user_agent: Annotated[Optional[str], Header()] = None, x_custom_header: Annotated[Optional[str], Header()] = None ): return {"user_agent": user_agent} # FILE UPLOAD @router.post("/upload") async def upload_file(file: UploadFile): contents = await file.read() return { "filename": file.filename, "size": len(contents), "content_type": file.content_type } # MULTIPLE FILES @router.post("/uploads") async def upload_files(files: list[UploadFile]): return [{"filename": f.filename} for f in files]
Responses &
Status Codes
from fastapi import FastAPI, Response, status from fastapi.responses import JSONResponse, HTMLResponse, FileResponse, StreamingResponse from typing import Union # 1. Return a dict โ FastAPI serializes it to JSON automatically @app.get("/simple") async def simple(): return {"message": "hello"} # 2. response_model โ filter what fields are returned @app.get("/users/{id}", response_model=UserResponse) async def get_user(id: int): user = get_from_db(id) # has password field return user # UserResponse omits password automatically! # 3. Multiple possible response models @app.get("/items/{id}", response_model=Union[ItemResponse, ErrorResponse]) async def get_item(id: int): ... # 4. Custom status codes @app.post("/items", status_code=201) # 201 Created @app.delete("/items/{id}", status_code=204) # 204 No Content # 5. JSONResponse with custom headers @app.get("/custom") async def custom(): return JSONResponse( content={"data": "value"}, status_code=200, headers={"X-Custom-Header": "my-value"} ) # 6. Stream large files (ML models, CSV downloads) @app.get("/download") async def download(): async def generate(): for chunk in read_large_file(): yield chunk return StreamingResponse(generate(), media_type="text/csv", headers={"Content-Disposition": 'attachment; filename="data.csv"'}) # 7. File response (static file from disk) @app.get("/report") async def get_report(): return FileResponse("reports/report.pdf", media_type="application/pdf") # 8. Background tasks (return response, then do something) from fastapi import BackgroundTasks @app.post("/send-email") async def send_email(email: str, bg: BackgroundTasks): bg.add_task(send_email_async, email) # runs AFTER response sent return {"message": "Email queued"}
Dependency Injection
FastAPI's DI system is like Angular's โ inject shared logic (DB sessions, auth, rate limiting) into any route function.
from fastapi import Depends, HTTPException, status, Security from fastapi.security import OAuth2PasswordBearer from app.database import SessionLocal from typing import Annotated # โโ DEPENDENCY 1: Database Session โโโโโโโโโโโโโโโโโ # Opens a DB session, passes it to route, closes when done def get_db(): db = SessionLocal() try: yield db # yield = give to route function finally: db.close() # ALWAYS close, even if error occurs # Use in route โ db is automatically provided @router.get("/users") async def get_users(db: Annotated[Session, Depends(get_db)]): return db.query(User).all() # โโ DEPENDENCY 2: Current User (Auth) โโโโโโโโโโโโโโ oauth2_scheme = OAuth2PasswordBearer(tokenUrl="auth/token") async def get_current_user( token: Annotated[str, Depends(oauth2_scheme)], db: Annotated[Session, Depends(get_db)] ) -> User: credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid credentials", headers={"WWW-Authenticate": "Bearer"} ) try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) user_id = payload.get("sub") if not user_id: raise credentials_exception except JWTError: raise credentials_exception user = db.get(User, int(user_id)) if not user: raise credentials_exception return user # โโ DEPENDENCY 3: Role-based access โโโโโโโโโโโโโโโ def require_role(*roles: str): async def check_role(user: Annotated[User, Depends(get_current_user)]): if user.role not in roles: raise HTTPException(403, "Insufficient permissions") return user return check_role # โโ Usage in routes โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ # Type aliases for cleaner code (like TypeScript type aliases) CurrentUser = Annotated[User, Depends(get_current_user)] AdminOnly = Annotated[User, Depends(require_role("admin"))] DBSession = Annotated[Session, Depends(get_db)] @router.get("/profile") async def profile(user: CurrentUser): # auto-injects current user return user @router.delete("/users/{id}") async def delete_user(id: int, admin: AdminOnly, db: DBSession): return db.delete(db.get(User, id)) # โโ DEPENDENCY 4: Class-based dependency โโโโโโโโโโ class Pagination: def __init__(self, skip: int = 0, limit: int = Query(10, le=100)): self.skip = skip self.limit = limit @router.get("/items") async def list_items(page: Annotated[Pagination, Depends(Pagination)]): return {"skip": page.skip, "limit": page.limit}
Middleware & CORS
from fastapi import Request, Response from starlette.middleware.base import BaseHTTPMiddleware import time, logging, uuid # Custom middleware โ runs before AND after every request class LoggingMiddleware(BaseHTTPMiddleware): async def dispatch(self, request: Request, call_next): request_id = str(uuid.uuid4())[:8] start = time.time() # BEFORE the route handler logging.info(f"[{request_id}] {request.method} {request.url.path}") response: Response = await call_next(request) # run route handler # AFTER the route handler duration = (time.time() - start) * 1000 logging.info(f"[{request_id}] {response.status_code} โ {duration:.1f}ms") response.headers["X-Request-ID"] = request_id response.headers["X-Process-Time"] = f"{duration:.1f}ms" return response # Rate limiting middleware from collections import defaultdict import time class RateLimitMiddleware(BaseHTTPMiddleware): def __init__(self, app, max_requests=100, window=60): super().__init__(app) self.requests = defaultdict(list) self.max_requests = max_requests self.window = window async def dispatch(self, request: Request, call_next): ip = request.client.host now = time.time() self.requests[ip] = [t for t in self.requests[ip] if now - t < self.window] if len(self.requests[ip]) >= self.max_requests: return JSONResponse({"error": "Rate limit exceeded"}, status_code=429) self.requests[ip].append(now) return await call_next(request) # Register in main.py app.add_middleware(LoggingMiddleware) app.add_middleware(RateLimitMiddleware, max_requests=100, window=60) app.add_middleware( CORSMiddleware, allow_origins=["http://localhost:4200"], # Angular dev server allow_credentials=True, allow_methods=["*"], allow_headers=["*"], )
Database with
SQLAlchemy
Full database setup: connection, ORM models, sessions, CRUD operations, migrations with Alembic.
from sqlalchemy import create_engine from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker import os DATABASE_URL = os.getenv("DATABASE_URL", "sqlite:///./dev.db") # For PostgreSQL: postgresql://user:password@host:5432/dbname # For MySQL: mysql+pymysql://user:password@host:3306/dbname engine = create_engine( DATABASE_URL, connect_args={"check_same_thread": False} if "sqlite" in DATABASE_URL else {} ) SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) Base = declarative_base()
from sqlalchemy import Column, Integer, String, Boolean, DateTime, ForeignKey, Enum from sqlalchemy.orm import relationship from sqlalchemy.sql import func from app.database import Base class User(Base): __tablename__ = "users" id = Column(Integer, primary_key=True, index=True) name = Column(String(50), nullable=False) email = Column(String(255), unique=True, index=True, nullable=False) password = Column(String(255), nullable=False) role = Column(Enum("admin", "user", "viewer"), default="user") is_active = Column(Boolean, default=True) created_at = Column(DateTime, server_default=func.now()) updated_at = Column(DateTime, onupdate=func.now()) # Relationship โ one user has many posts posts = relationship("Post", back_populates="author", cascade="all, delete") class Post(Base): __tablename__ = "posts" id = Column(Integer, primary_key=True) title = Column(String(200), nullable=False) content = Column(String(10000)) author_id = Column(Integer, ForeignKey("users.id"), nullable=False) author = relationship("User", back_populates="posts")
from sqlalchemy.orm import Session from app.models.user import User from app.schemas.user import UserCreate, UserUpdate from passlib.context import CryptContext pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") class UserService: def __init__(self, db: Session): self.db = db def create(self, data: UserCreate) -> User: hashed = pwd_context.hash(data.password) user = User(**data.model_dump(exclude={"password"}), password=hashed) self.db.add(user) self.db.commit() self.db.refresh(user) return user def get_by_id(self, user_id: int) -> User | None: return self.db.query(User).filter(User.id == user_id).first() def get_by_email(self, email: str) -> User | None: return self.db.query(User).filter(User.email == email).first() def get_all(self, skip=0, limit=10) -> list[User]: return self.db.query(User).offset(skip).limit(limit).all() def update(self, user_id: int, data: UserUpdate) -> User: user = self.get_by_id(user_id) for key, val in data.model_dump(exclude_none=True).items(): setattr(user, key, val) self.db.commit() self.db.refresh(user) return user def delete(self, user_id: int): user = self.get_by_id(user_id) self.db.delete(user) self.db.commit()
JWT Authentication
Complete login/register/token system. Same pattern as every production API.
from fastapi import APIRouter, Depends, HTTPException, status from fastapi.security import OAuth2PasswordRequestForm from jose import jwt, JWTError from datetime import datetime, timedelta import os router = APIRouter() SECRET_KEY = os.getenv("SECRET_KEY", "change-this-in-production") ALGORITHM = "HS256" ACCESS_TOKEN_EXPIRE = 30 # minutes REFRESH_TOKEN_EXPIRE = 7 # days def create_token(data: dict, expires_in: timedelta) -> str: payload = data.copy() payload["exp"] = datetime.utcnow() + expires_in return jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM) # POST /auth/register @router.post("/register", response_model=UserResponse, status_code=201) async def register(data: UserCreate, db: DBSession): svc = UserService(db) if svc.get_by_email(data.email): raise HTTPException(400, "Email already registered") return svc.create(data) # POST /auth/token โ login (OAuth2 compatible, works with Swagger UI!) @router.post("/token") async def login(form: OAuth2PasswordRequestForm = Depends(), db: DBSession = ...): svc = UserService(db) user = svc.get_by_email(form.username) # username field = email if not user or not pwd_context.verify(form.password, user.password): raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect email or password", headers={"WWW-Authenticate": "Bearer"} ) access_token = create_token( {"sub": str(user.id), "role": user.role}, timedelta(minutes=ACCESS_TOKEN_EXPIRE) ) refresh_token = create_token( {"sub": str(user.id), "type": "refresh"}, timedelta(days=REFRESH_TOKEN_EXPIRE) ) return { "access_token": access_token, "refresh_token": refresh_token, "token_type": "bearer" } # POST /auth/refresh โ get new access token using refresh token @router.post("/refresh") async def refresh(refresh_token: str, db: DBSession): try: payload = jwt.decode(refresh_token, SECRET_KEY, algorithms=[ALGORITHM]) if payload.get("type") != "refresh": raise HTTPException(401, "Invalid token type") user_id = payload.get("sub") user = UserService(db).get_by_id(int(user_id)) access_token = create_token({"sub": str(user.id)}, timedelta(minutes=30)) return {"access_token": access_token, "token_type": "bearer"} except JWTError: raise HTTPException(401, "Invalid refresh token") # GET /auth/me โ get current user profile @router.get("/me", response_model=UserResponse) async def me(user: CurrentUser): return user
Async / Await
Deep Dive
Understanding async is key to FastAPI's performance โ and to building WebSockets.
import asyncio import httpx # โ USE async when: calling other APIs, DB operations, file I/O @app.get("/weather") async def get_weather(city: str): async with httpx.AsyncClient() as client: response = await client.get(f"https://api.weather.com/v1/{city}") return response.json() # โ Run multiple async operations in PARALLEL (like Promise.all) @app.get("/dashboard") async def dashboard(user: CurrentUser, db: DBSession): users_task = asyncio.create_task(get_users_async(db)) orders_task = asyncio.create_task(get_orders_async(db)) metrics_task = asyncio.create_task(get_metrics_async()) # Wait for ALL to finish simultaneously โ not one by one! users, orders, metrics = await asyncio.gather( users_task, orders_task, metrics_task ) return {"users": users, "orders": orders, "metrics": metrics} # โ ๏ธ CPU-heavy work? Use run_in_executor to not block the event loop import concurrent.futures @app.post("/predict") async def predict(data: PredictRequest): loop = asyncio.get_event_loop() with concurrent.futures.ThreadPoolExecutor() as pool: # Run CPU-heavy ML prediction in a thread (won't block other requests) result = await loop.run_in_executor(pool, ml_model.predict, data.features) return {"prediction": result} # def vs async def โ both work! FastAPI handles both # def โ FastAPI runs in a thread (for CPU/blocking code) # async def โ runs in event loop (for I/O bound code)
Error Handling
from fastapi import FastAPI, Request, HTTPException from fastapi.responses import JSONResponse from fastapi.exceptions import RequestValidationError from pydantic import BaseModel # Custom exception class class AppException(Exception): def __init__(self, status_code: int, message: str, code: str = None): self.status_code = status_code self.message = message self.code = code # Handle your custom exceptions @app.exception_handler(AppException) async def app_exception_handler(request: Request, exc: AppException): return JSONResponse( status_code=exc.status_code, content={"error": exc.message, "code": exc.code} ) # Handle FastAPI's built-in 404, 401, etc @app.exception_handler(HTTPException) async def http_exception_handler(request: Request, exc: HTTPException): return JSONResponse( status_code=exc.status_code, content={"error": exc.detail, "status_code": exc.status_code} ) # Handle Pydantic validation errors (bad request body) @app.exception_handler(RequestValidationError) async def validation_error_handler(request: Request, exc: RequestValidationError): errors = [] for error in exc.errors(): errors.append({ "field": " โ ".join(str(x) for x in error["loc"][1:]), "message": error["msg"], "type": error["type"] }) return JSONResponse( status_code=422, content={"error": "Validation failed", "details": errors} ) # Handle all other unexpected errors @app.exception_handler(Exception) async def generic_exception_handler(request: Request, exc: Exception): logging.error(f"Unhandled error: {exc}", exc_info=True) return JSONResponse( status_code=500, content={"error": "Internal server error"} ) # Usage in routes @app.get("/users/{id}") async def get_user(id: int, db: DBSession): user = UserService(db).get_by_id(id) if not user: raise AppException(404, f"User {id} not found", code="USER_NOT_FOUND") return user
Testing with Pytest
import pytest from fastapi.testclient import TestClient from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker from app.main import app from app.database import Base, get_db # Use an in-memory SQLite for tests TEST_DB_URL = "sqlite:///./test.db" engine = create_engine(TEST_DB_URL, connect_args={"check_same_thread": False}) TestSession = sessionmaker(bind=engine) @pytest.fixture(scope="function", autouse=True) def setup_db(): Base.metadata.create_all(bind=engine) # create tables yield Base.metadata.drop_all(bind=engine) # clean up after each test @pytest.fixture() def client(): def override_get_db(): db = TestSession() try: yield db finally: db.close() app.dependency_overrides[get_db] = override_get_db # swap real DB for test DB with TestClient(app) as c: yield c app.dependency_overrides.clear() # โโ TESTS โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ def test_register_user(client): response = client.post("/users/", json={ "name": "Ravi Kumar", "email": "[email protected]", "password": "securepassword123" }) assert response.status_code == 201 data = response.json() assert data["email"] == "[email protected]" assert "password" not in data # password must never be in response! def test_duplicate_email_fails(client): payload = {"name": "Test", "email": "[email protected]", "password": "password123"} client.post("/users/", json=payload) response = client.post("/users/", json=payload) # duplicate assert response.status_code == 400 def test_login_and_get_profile(client): # Register client.post("/users/", json={"name": "Test", "email": "[email protected]", "password": "password123"}) # Login login = client.post("/auth/token", data={"username": "[email protected]", "password": "password123"}) assert login.status_code == 200 token = login.json()["access_token"] # Use token profile = client.get("/auth/me", headers={"Authorization": f"Bearer {token}"}) assert profile.status_code == 200 assert profile.json()["email"] == "[email protected]"
Docker &
Deployment
# Multi-stage build for smaller image FROM python:3.11-slim AS builder WORKDIR /app COPY requirements.txt . RUN pip install --no-cache-dir --user -r requirements.txt FROM python:3.11-slim WORKDIR /app COPY --from=builder /root/.local /root/.local COPY ./app ./app ENV PATH=/root/.local/bin:$PATH # Run with Gunicorn + Uvicorn workers (production-ready) CMD ["gunicorn", "app.main:app", \ "--workers", "4", \ "--worker-class", "uvicorn.workers.UvicornWorker", \ "--bind", "0.0.0.0:8000", \ "--timeout", "120"]
version: '3.9'
services:
api:
build: .
ports: ["8000:8000"]
environment:
- DATABASE_URL=postgresql://user:pass@db:5432/mydb
- SECRET_KEY=your-super-secret-key
depends_on: [db, redis]
restart: unless-stopped
db:
image: postgres:15-alpine
environment:
POSTGRES_USER: user
POSTGRES_PASSWORD: pass
POSTGRES_DB: mydb
volumes: ["pgdata:/var/lib/postgresql/data"]
redis:
image: redis:7-alpine
volumes:
pgdata:
Real-World Patterns
Pagination, filtering, background jobs, caching โ the stuff you'll use every day.
from pydantic import BaseModel from typing import Generic, TypeVar, Optional, List from math import ceil T = TypeVar('T') # Generic paginated response โ works for ANY model class Page(BaseModel, Generic[T]): items: List[T] total: int page: int page_size: int total_pages: int has_next: bool has_prev: bool def paginate(query, page: int, page_size: int) -> Page: total = query.count() items = query.offset((page - 1) * page_size).limit(page_size).all() total_pages = ceil(total / page_size) if total > 0 else 0 return Page(items=items, total=total, page=page, page_size=page_size, total_pages=total_pages, has_next=page < total_pages, has_prev=page > 1) # Usage: GET /users?page=2&page_size=10 @router.get("/users", response_model=Page[UserResponse]) async def get_users(page: int = 1, page_size: int = Query(10, le=100), db: DBSession): query = db.query(User).filter(User.is_active == True) return paginate(query, page, page_size) # Generic API response envelope class ApiResponse(BaseModel, Generic[T]): success: bool = True data: Optional[T] = None message: Optional[str] = None errors: Optional[list] = None @router.get("/users/{id}", response_model=ApiResponse[UserResponse]) async def get_user(id: int, db: DBSession): user = UserService(db).get_by_id(id) return ApiResponse(data=user, message="User found") # Filtering with query params class UserFilter(BaseModel): name: Optional[str] = None role: Optional[str] = None is_active: Optional[bool] = None @router.get("/users/search") async def search_users(filters: Annotated[UserFilter, Query()], db: DBSession): query = db.query(User) if filters.name: query = query.filter(User.name.ilike(f"%{filters.name}%")) if filters.role: query = query.filter(User.role == filters.role) if filters.is_active is not None: query = query.filter(User.is_active == filters.is_active) return query.all()