Ctrl K

FastAPI Authentication and Authorization

A Docker based FastAPI project that separates password authentication, JWT current-user resolution, role based authorization, and owner based permissions.

This setup creates a focused FastAPI authentication and authorization example. It shows how to register users, hash passwords, log in with JWT tokens, resolve the current user, protect routes, check admin role access, and enforce owner based permissions.

Project structure

Create a separate project folder for the authentication and authorization example. This project keeps auth logic, database setup, models, schemas, and routes in separate files.

04-authentication-authorization/
  app/
    __init__.py
    auth.py
    database.py
    main.py
    models.py
    schemas.py
  requirements.txt
  Dockerfile
  compose.yaml
mkdir 04-authentication-authorization
cd 04-authentication-authorization
mkdir app
touch app/__init__.py

Create the database connection

Create the database setup file at app/database.py. This example uses SQLite in a Docker volume so the auth flow can run without a separate database container.

app/database.py
import os

from sqlalchemy import create_engine
from sqlalchemy.orm import declarative_base, sessionmaker

DATABASE_URL = os.getenv("DATABASE_URL", "sqlite:////data/app.db")

connect_args = {}

if DATABASE_URL.startswith("sqlite"):
    connect_args = {
        "check_same_thread": False
    }

engine = create_engine(
    DATABASE_URL,
    connect_args=connect_args,
)

SessionLocal = sessionmaker(
    autocommit=False,
    autoflush=False,
    bind=engine,
)

Base = declarative_base()


def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()

Create the SQLAlchemy models

Create the database models file at app/models.py. User stores login identity, password hash, and role. Post belongs to a user and is used for owner based authorization.

app/models.py
from datetime import UTC, datetime

from sqlalchemy import Boolean, Column, DateTime, ForeignKey, Integer, String, Text
from sqlalchemy.orm import relationship

from app.database import Base


class User(Base):
    __tablename__ = "users"

    id = Column(Integer, primary_key=True, index=True)
    username = Column(String(50), unique=True, nullable=False, index=True)
    email = Column(String(120), unique=True, nullable=False, index=True)
    hashed_password = Column(String(255), nullable=False)
    is_admin = Column(Boolean, nullable=False, default=False)

    posts = relationship(
        "Post",
        back_populates="author",
        cascade="all, delete-orphan",
    )


class Post(Base):
    __tablename__ = "posts"

    id = Column(Integer, primary_key=True, index=True)
    title = Column(String(100), nullable=False)
    content = Column(Text, nullable=False)
    user_id = Column(
        Integer,
        ForeignKey("users.id"),
        nullable=False,
        index=True,
    )
    date_posted = Column(
        DateTime(timezone=True),
        default=lambda: datetime.now(UTC),
        nullable=False,
    )

    author = relationship(
        "User",
        back_populates="posts",
    )

Create the Pydantic schemas

Create the schema file at app/schemas.py. These schemas define registration input, token output, user output, post creation, post update, and post response.

app/schemas.py
from datetime import datetime

from pydantic import BaseModel, ConfigDict, EmailStr, Field


class Token(BaseModel):
    access_token: str
    token_type: str


class UserCreate(BaseModel):
    username: str = Field(min_length=1, max_length=50)
    email: EmailStr = Field(max_length=120)
    password: str = Field(min_length=8, max_length=128)


class UserResponse(BaseModel):
    model_config = ConfigDict(from_attributes=True)

    id: int
    username: str
    email: EmailStr
    is_admin: bool


class PostCreate(BaseModel):
    title: str = Field(min_length=1, max_length=100)
    content: str = Field(min_length=1)


class PostUpdate(BaseModel):
    title: str | None = Field(default=None, min_length=1, max_length=100)
    content: str | None = Field(default=None, min_length=1)


class PostResponse(BaseModel):
    model_config = ConfigDict(from_attributes=True)

    id: int
    title: str
    content: str
    user_id: int
    date_posted: datetime

The request schema accepts a plain password only during registration. The database stores hashed_password, and the response schema never exposes password fields.

Create the authentication utilities

Create the authentication utility file at app/auth.py. This file handles password hashing, password verification, JWT creation, current user resolution, and admin authorization.

app/auth.py
import os
from datetime import UTC, datetime, timedelta
from typing import Annotated

from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from jose import JWTError, jwt
from passlib.context import CryptContext
from sqlalchemy import select
from sqlalchemy.orm import Session

from app import models
from app.database import get_db

SECRET_KEY = os.getenv("SECRET_KEY", "dev-secret-change-me")
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30

pwd_context = CryptContext(
    schemes=["bcrypt"],
    deprecated="auto",
)

oauth2_scheme = OAuth2PasswordBearer(
    tokenUrl="auth/token",
)


def verify_password(plain_password, hashed_password):
    return pwd_context.verify(
        plain_password,
        hashed_password,
    )


def get_password_hash(password):
    return pwd_context.hash(password)


def create_access_token(data):
    to_encode = data.copy()
    expire = datetime.now(UTC) + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)

    to_encode.update(
        {
            "exp": expire,
        }
    )

    return jwt.encode(
        to_encode,
        SECRET_KEY,
        algorithm=ALGORITHM,
    )


def get_current_user(
    token: Annotated[str, Depends(oauth2_scheme)],
    db: Annotated[Session, Depends(get_db)],
):
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={
            "WWW-Authenticate": "Bearer",
        },
    )

    try:
        payload = jwt.decode(
            token,
            SECRET_KEY,
            algorithms=[ALGORITHM],
        )

        username = payload.get("sub")

        if username is None:
            raise credentials_exception
    except JWTError as exc:
        raise credentials_exception from exc

    user = db.execute(
        select(models.User).where(models.User.username == username)
    ).scalar_one_or_none()

    if user is None:
        raise credentials_exception

    return user


def require_admin(
    current_user: Annotated[models.User, Depends(get_current_user)],
):
    if not current_user.is_admin:
        raise HTTPException(
            status_code=status.HTTP_403_FORBIDDEN,
            detail="Admin access required",
        )

    return current_user

Create the FastAPI app

Create the main application file at app/main.py. This file creates the tables, registers users, logs users in, protects routes, checks admin access, and checks post ownership.

app/main.py
from contextlib import asynccontextmanager
from typing import Annotated

from fastapi import Depends, FastAPI, HTTPException, Response, status
from fastapi.security import OAuth2PasswordRequestForm
from sqlalchemy import or_, select
from sqlalchemy.orm import Session

from app import models
from app.auth import (
    create_access_token,
    get_current_user,
    get_password_hash,
    require_admin,
    verify_password,
)
from app.database import Base, engine, get_db
from app.schemas import (
    PostCreate,
    PostResponse,
    PostUpdate,
    Token,
    UserCreate,
    UserResponse,
)


@asynccontextmanager
async def lifespan(_app):
    Base.metadata.create_all(bind=engine)
    yield


app = FastAPI(
    title="04 Authentication Authorization",
    lifespan=lifespan,
)


@app.get("/")
def home():
    return {
        "message": "FastAPI authentication and authorization example"
    }


@app.post(
    "/auth/register",
    response_model=UserResponse,
    status_code=status.HTTP_201_CREATED,
)
def register(
    payload: UserCreate,
    db: Annotated[Session, Depends(get_db)],
):
    existing_user = db.execute(
        select(models.User).where(
            or_(
                models.User.username == payload.username,
                models.User.email == payload.email,
            )
        )
    ).scalar_one_or_none()

    if existing_user:
        raise HTTPException(
            status_code=status.HTTP_409_CONFLICT,
            detail="Username or email already exists",
        )

    user_count = db.execute(
        select(models.User)
    ).scalars().all()

    user = models.User(
        username=payload.username,
        email=payload.email,
        hashed_password=get_password_hash(payload.password),
        is_admin=len(user_count) == 0,
    )

    db.add(user)
    db.commit()
    db.refresh(user)

    return user


@app.post("/auth/token", response_model=Token)
def login(
    form_data: Annotated[OAuth2PasswordRequestForm, Depends()],
    db: Annotated[Session, Depends(get_db)],
):
    user = db.execute(
        select(models.User).where(models.User.username == form_data.username)
    ).scalar_one_or_none()

    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password",
        )

    if not verify_password(form_data.password, user.hashed_password):
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password",
        )

    access_token = create_access_token(
        data={
            "sub": user.username,
        }
    )

    return {
        "access_token": access_token,
        "token_type": "bearer",
    }


@app.get("/users/me", response_model=UserResponse)
def read_current_user(
    current_user: Annotated[models.User, Depends(get_current_user)],
):
    return current_user


@app.get("/admin/users", response_model=list[UserResponse])
def list_users_for_admin(
    _admin_user: Annotated[models.User, Depends(require_admin)],
    db: Annotated[Session, Depends(get_db)],
):
    users = db.execute(
        select(models.User).order_by(models.User.id)
    ).scalars().all()

    return users


@app.post(
    "/posts",
    response_model=PostResponse,
    status_code=status.HTTP_201_CREATED,
)
def create_post(
    payload: PostCreate,
    current_user: Annotated[models.User, Depends(get_current_user)],
    db: Annotated[Session, Depends(get_db)],
):
    post = models.Post(
        title=payload.title,
        content=payload.content,
        user_id=current_user.id,
    )

    db.add(post)
    db.commit()
    db.refresh(post)

    return post


@app.get("/posts", response_model=list[PostResponse])
def list_posts(
    db: Annotated[Session, Depends(get_db)],
):
    posts = db.execute(
        select(models.Post).order_by(models.Post.id)
    ).scalars().all()

    return posts


@app.patch("/posts/{post_id}", response_model=PostResponse)
def update_post(
    post_id: int,
    payload: PostUpdate,
    current_user: Annotated[models.User, Depends(get_current_user)],
    db: Annotated[Session, Depends(get_db)],
):
    post = db.get(models.Post, post_id)

    if not post:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail="Post not found",
        )

    if post.user_id != current_user.id and not current_user.is_admin:
        raise HTTPException(
            status_code=status.HTTP_403_FORBIDDEN,
            detail="Only the owner or an admin can update this post",
        )

    update_data = payload.model_dump(exclude_unset=True)

    for field, value in update_data.items():
        setattr(post, field, value)

    db.commit()
    db.refresh(post)

    return post


@app.delete("/posts/{post_id}", status_code=status.HTTP_204_NO_CONTENT)
def delete_post(
    post_id: int,
    current_user: Annotated[models.User, Depends(get_current_user)],
    db: Annotated[Session, Depends(get_db)],
):
    post = db.get(models.Post, post_id)

    if not post:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail="Post not found",
        )

    if post.user_id != current_user.id and not current_user.is_admin:
        raise HTTPException(
            status_code=status.HTTP_403_FORBIDDEN,
            detail="Only the owner or an admin can delete this post",
        )

    db.delete(post)
    db.commit()

    return Response(status_code=status.HTTP_204_NO_CONTENT)

Add dependencies

Create the dependency file at requirements.txt. python-multipart is required for OAuth2PasswordRequestForm. passlib handles password hashing. python-jose handles JWT tokens.

requirements.txt
fastapi
uvicorn[standard]
sqlalchemy
email-validator
passlib[bcrypt]
python-jose[cryptography]
python-multipart

Create the Dockerfile

Create the Dockerfile at the project root. The container owns the Python environment and runs the FastAPI app with Uvicorn.

Dockerfile
FROM python:3.12-slim

WORKDIR /app

RUN python -m venv /opt/venv

ENV PATH="/opt/venv/bin:$PATH"
ENV PYTHONDONTWRITEBYTECODE=1
ENV PYTHONUNBUFFERED=1

COPY requirements.txt .

RUN pip install --upgrade pip
RUN pip install -r requirements.txt

COPY . .

EXPOSE 8000

CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000", "--reload"]

Create the Compose file

Create the Docker Compose file at compose.yaml. The SQLite database is stored in a named Docker volume and SECRET_KEY is passed as an environment variable.

compose.yaml
services:
  backend:
    build: .
    container_name: fastapi-04-authentication-authorization
    ports:
      - "8000:8000"
    environment:
      DATABASE_URL: sqlite:////data/app.db
      SECRET_KEY: change-this-dev-secret
    volumes:
      - .:/app
      - auth_data:/data

volumes:
  auth_data:

Run the project

Run Docker Compose from inside the project folder. The app creates the SQLite database tables during FastAPI startup.

docker compose up --build

Open the API docs page to test registration, login, protected routes, and authorization rules.

http://localhost:8000/docs

Register and login

Create the first user through POST /auth/register. The first registered user becomes admin in this sample so the admin route can be tested immediately.

{
  "username": "admin",
  "email": "admin@example.com",
  "password": "strongpassword123"
}

Log in through POST /auth/token. The Swagger UI shows username and password form fields for this route.

username: admin
password: strongpassword123

The response returns a bearer token.

{
  "access_token": "jwt-token-value",
  "token_type": "bearer"
}

Use protected routes

Use the Authorize button in Swagger UI and paste the access token. After that, protected routes can resolve the current user.

GET /users/me
POST /posts
PATCH /posts/{post_id}
DELETE /posts/{post_id}

Create a post through POST /posts. The backend assigns user_id from the authenticated user instead of accepting it from the client.

{
  "title": "Authenticated Post",
  "content": "This post belongs to the current user."
}

Authorization rules

Authentication checks who the user is. Authorization checks what the user is allowed to do.

GET /users/me
  requires any logged in user

GET /admin/users
  requires admin user

PATCH /posts/{post_id}
  requires post owner or admin

DELETE /posts/{post_id}
  requires post owner or admin

The current user is loaded from the JWT token. Admin access is checked with require_admin. Owner access is checked by comparing post.user_id with current_user.id.

Mental model

The authentication layer should not be mixed with the authorization rules. Keep current-user resolution, role checks, and ownership checks as separate concepts.

Password hashing
  stores safe password hash

JWT token
  carries login identity

get_current_user
  converts token into database user

require_admin
  checks role based permission

owner check
  compares resource owner with current user

route function
  combines the required access rule with the operation

Stop or reset the project

Stop the running container without deleting the SQLite data.

docker compose down

Reset the project by removing the Docker volume. This deletes the SQLite database records.

docker compose down -v