- 8 DB models (services, incidents, monitors, subscribers, etc.) - Full CRUD API for services, incidents, monitors - Public status page with live data - Incident detail page with timeline - API key authentication - Uptime monitoring scheduler - 13 tests passing - TECHNICAL_DESIGN.md with full spec
195 lines
No EOL
6.3 KiB
Python
195 lines
No EOL
6.3 KiB
Python
"""Incidents API endpoints."""
|
|
|
|
from datetime import datetime
|
|
from uuid import UUID
|
|
|
|
from fastapi import APIRouter, Depends, HTTPException, status
|
|
from pydantic import BaseModel, Field
|
|
from sqlalchemy import select
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from app.dependencies import get_db, verify_api_key
|
|
from app.models.models import Incident, IncidentUpdate
|
|
|
|
router = APIRouter()
|
|
|
|
|
|
class IncidentCreate(BaseModel):
|
|
service_id: UUID
|
|
title: str = Field(..., max_length=200)
|
|
status: str = Field(..., pattern=r"^(investigating|identified|monitoring|resolved)$")
|
|
severity: str = Field(..., pattern=r"^(minor|major|outage)$")
|
|
started_at: datetime | None = None
|
|
|
|
|
|
class IncidentUpdateCreate(BaseModel):
|
|
status: str = Field(..., pattern=r"^(investigating|identified|monitoring|resolved)$")
|
|
body: str
|
|
|
|
|
|
class IncidentPatch(BaseModel):
|
|
title: str | None = None
|
|
status: str | None = Field(None, pattern=r"^(investigating|identified|monitoring|resolved)$")
|
|
severity: str | None = Field(None, pattern=r"^(minor|major|outage)$")
|
|
|
|
|
|
def serialize_incident(i: Incident) -> dict:
|
|
return {
|
|
"id": i.id,
|
|
"service_id": i.service_id,
|
|
"title": i.title,
|
|
"status": i.status,
|
|
"severity": i.severity,
|
|
"started_at": i.started_at.isoformat() if i.started_at else None,
|
|
"resolved_at": i.resolved_at.isoformat() if i.resolved_at else None,
|
|
"created_at": i.created_at.isoformat() if i.created_at else None,
|
|
"updated_at": i.updated_at.isoformat() if i.updated_at else None,
|
|
}
|
|
|
|
|
|
async def serialize_incident_detail(i: Incident, db: AsyncSession) -> dict:
|
|
"""Serialize incident with its updates, querying explicitly to avoid lazy-load issues."""
|
|
data = serialize_incident(i)
|
|
# Explicitly query updates instead of relying on lazy-loaded relationship
|
|
result = await db.execute(
|
|
select(IncidentUpdate)
|
|
.where(IncidentUpdate.incident_id == i.id)
|
|
.order_by(IncidentUpdate.created_at)
|
|
)
|
|
updates = result.scalars().all()
|
|
data["updates"] = [
|
|
{
|
|
"id": u.id,
|
|
"status": u.status,
|
|
"body": u.body,
|
|
"created_at": u.created_at.isoformat() if u.created_at else None,
|
|
}
|
|
for u in updates
|
|
]
|
|
return data
|
|
|
|
|
|
@router.get("/")
|
|
async def list_incidents(
|
|
service_id: UUID | None = None,
|
|
status: str | None = None,
|
|
limit: int = 50,
|
|
offset: int = 0,
|
|
db: AsyncSession = Depends(get_db),
|
|
):
|
|
"""List incidents with optional filtering."""
|
|
query = select(Incident).order_by(Incident.started_at.desc())
|
|
if service_id:
|
|
query = query.where(Incident.service_id == str(service_id))
|
|
if status:
|
|
query = query.where(Incident.status == status)
|
|
query = query.offset(offset).limit(limit)
|
|
result = await db.execute(query)
|
|
incidents = result.scalars().all()
|
|
return [serialize_incident(i) for i in incidents]
|
|
|
|
|
|
@router.post("/", status_code=status.HTTP_201_CREATED)
|
|
async def create_incident(
|
|
data: IncidentCreate,
|
|
db: AsyncSession = Depends(get_db),
|
|
api_key: str = Depends(verify_api_key),
|
|
):
|
|
"""Create a new incident."""
|
|
incident = Incident(
|
|
service_id=str(data.service_id),
|
|
title=data.title,
|
|
status=data.status,
|
|
severity=data.severity,
|
|
started_at=data.started_at or datetime.utcnow(),
|
|
)
|
|
db.add(incident)
|
|
await db.flush()
|
|
await db.refresh(incident)
|
|
return serialize_incident(incident)
|
|
|
|
|
|
@router.get("/{incident_id}")
|
|
async def get_incident(incident_id: UUID, db: AsyncSession = Depends(get_db)):
|
|
"""Get an incident with its updates."""
|
|
result = await db.execute(select(Incident).where(Incident.id == str(incident_id)))
|
|
incident = result.scalar_one_or_none()
|
|
if not incident:
|
|
raise HTTPException(status_code=404, detail="Incident not found")
|
|
return await serialize_incident_detail(incident, db)
|
|
|
|
|
|
@router.patch("/{incident_id}")
|
|
async def update_incident(
|
|
incident_id: UUID,
|
|
data: IncidentPatch,
|
|
db: AsyncSession = Depends(get_db),
|
|
api_key: str = Depends(verify_api_key),
|
|
):
|
|
"""Update incident fields (title, status, severity)."""
|
|
result = await db.execute(select(Incident).where(Incident.id == str(incident_id)))
|
|
incident = result.scalar_one_or_none()
|
|
if not incident:
|
|
raise HTTPException(status_code=404, detail="Incident not found")
|
|
|
|
update_data = data.model_dump(exclude_unset=True)
|
|
for field, value in update_data.items():
|
|
setattr(incident, field, value)
|
|
|
|
# If status changed to resolved, set resolved_at
|
|
if data.status == "resolved" and "status" in update_data:
|
|
incident.resolved_at = datetime.utcnow()
|
|
|
|
await db.flush()
|
|
await db.refresh(incident)
|
|
return await serialize_incident_detail(incident, db)
|
|
|
|
|
|
@router.delete("/{incident_id}", status_code=status.HTTP_204_NO_CONTENT)
|
|
async def delete_incident(
|
|
incident_id: UUID,
|
|
db: AsyncSession = Depends(get_db),
|
|
api_key: str = Depends(verify_api_key),
|
|
):
|
|
"""Delete an incident."""
|
|
result = await db.execute(select(Incident).where(Incident.id == str(incident_id)))
|
|
incident = result.scalar_one_or_none()
|
|
if not incident:
|
|
raise HTTPException(status_code=404, detail="Incident not found")
|
|
await db.delete(incident)
|
|
|
|
|
|
@router.post("/{incident_id}/updates", status_code=status.HTTP_201_CREATED)
|
|
async def create_incident_update(
|
|
incident_id: UUID,
|
|
data: IncidentUpdateCreate,
|
|
db: AsyncSession = Depends(get_db),
|
|
api_key: str = Depends(verify_api_key),
|
|
):
|
|
"""Add an update to an incident."""
|
|
result = await db.execute(select(Incident).where(Incident.id == str(incident_id)))
|
|
incident = result.scalar_one_or_none()
|
|
if not incident:
|
|
raise HTTPException(status_code=404, detail="Incident not found")
|
|
|
|
update = IncidentUpdate(
|
|
incident_id=str(incident_id),
|
|
status=data.status,
|
|
body=data.body,
|
|
)
|
|
db.add(update)
|
|
# Also update incident status
|
|
incident.status = data.status
|
|
# If resolved, set resolved_at
|
|
if data.status == "resolved":
|
|
incident.resolved_at = datetime.utcnow()
|
|
|
|
await db.flush()
|
|
await db.refresh(update)
|
|
return {
|
|
"id": update.id,
|
|
"incident_id": update.incident_id,
|
|
"status": update.status,
|
|
"body": update.body,
|
|
"created_at": update.created_at.isoformat() if update.created_at else None,
|
|
} |