indie-status-page/app/auth.py

99 lines
No EOL
3.2 KiB
Python

"""JWT authentication and password hashing utilities."""
from datetime import datetime, timedelta
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from jose import JWTError, jwt
from passlib.context import CryptContext
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.config import settings
from app.dependencies import get_db
from app.models.saas_models import User, Organization, OrganizationMember
# Password hashing context
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
# OAuth2 scheme for token extraction
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/v1/auth/login")
def hash_password(password: str) -> str:
"""Hash a password using bcrypt."""
return pwd_context.hash(password)
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""Verify a password against a hash."""
return pwd_context.verify(plain_password, hashed_password)
def create_access_token(user_id: str, exp_hours: int = 72) -> str:
"""Create a JWT access token for the given user ID."""
payload = {
"sub": user_id,
"exp": datetime.utcnow() + timedelta(hours=exp_hours),
}
return jwt.encode(payload, settings.secret_key, algorithm="HS256")
def decode_access_token(token: str) -> dict:
"""Decode and verify a JWT access token. Raises JWTError on failure."""
return jwt.decode(token, settings.secret_key, algorithms=["HS256"])
async def get_current_user(
token: str = Depends(oauth2_scheme),
db: AsyncSession = Depends(get_db),
) -> User:
"""FastAPI dependency: extract and validate current user from JWT."""
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = decode_access_token(token)
user_id: str | None = payload.get("sub")
if user_id is None:
raise credentials_exception
except JWTError:
raise credentials_exception
result = await db.execute(select(User).where(User.id == user_id))
user = result.scalar_one_or_none()
if user is None:
raise credentials_exception
return user
async def get_current_org(
user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
) -> Organization:
"""FastAPI dependency: get the user's first organization (simplified).
In the future, this will support org-switching via header or session.
"""
result = await db.execute(
select(OrganizationMember).where(OrganizationMember.user_id == user.id)
)
membership = result.scalars().first()
if not membership:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="User is not a member of any organization",
)
org_result = await db.execute(
select(Organization).where(Organization.id == membership.organization_id)
)
org = org_result.scalar_one_or_none()
if org is None:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Organization not found",
)
return org