from contextlib import asynccontextmanager from datetime import datetime from fastapi import FastAPI, Request from fastapi.staticfiles import StaticFiles from fastapi.templating import Jinja2Templates from app.config import settings from app.database import init_db from app.api.router import api_v1_router @asynccontextmanager async def lifespan(app: FastAPI): """Application lifespan: create DB directories and tables on startup.""" # Ensure the data directory exists for SQLite db_path = settings.db_path db_path.parent.mkdir(parents=True, exist_ok=True) # Create tables (dev mode; use Alembic in production) await init_db() # Start the uptime monitoring scheduler from app.services.scheduler import start_scheduler, shutdown_scheduler start_scheduler() yield # Shutdown scheduler on exit shutdown_scheduler() app = FastAPI( title=settings.app_name, version="0.1.0", description="Lightweight status page tool for indie SaaS developers", lifespan=lifespan, ) # API routes app.include_router(api_v1_router, prefix="/api/v1") # Static files and templates app.mount("/static", StaticFiles(directory="app/static"), name="static") templates = Jinja2Templates(directory="app/templates") @app.get("/health") async def health_check(): """Health check endpoint for container orchestration.""" return {"status": "ok", "version": "0.1.0"} async def _get_service_status(service_id: str, db) -> str: """Derive a service's current status from its monitors' latest results.""" from sqlalchemy import select from app.models.models import Monitor, MonitorResult # Get all monitors for this service result = await db.execute( select(Monitor).where(Monitor.service_id == service_id, Monitor.is_active == True) # noqa: E712 ) monitors = result.scalars().all() if not monitors: return "up" # No monitors = assume operational # For each monitor, get the latest result worst_status = "up" status_priority = {"up": 0, "degraded": 1, "down": 2} for monitor in monitors: r = await db.execute( select(MonitorResult) .where(MonitorResult.monitor_id == monitor.id) .order_by(MonitorResult.checked_at.desc()) .limit(1) ) latest = r.scalar_one_or_none() if latest: if status_priority.get(latest.status, 0) > status_priority.get(worst_status, 0): worst_status = latest.status return worst_status @app.get("/") async def status_page(request: Request): """Public status page — shows all visible services and recent incidents.""" from sqlalchemy import select from app.database import async_session_factory from app.models.models import Service, Incident async with async_session_factory() as db: # Get all visible services, ordered by position then name result = await db.execute( select(Service).where(Service.is_visible == True).order_by(Service.position, Service.name) # noqa: E712 ) services = result.scalars().all() # Build services_by_group dict and attach current_status services_by_group = {} service_list = [] for s in services: current_status = await _get_service_status(s.id, db) svc_data = { "id": s.id, "name": s.name, "slug": s.slug, "description": s.description, "group_name": s.group_name, "position": s.position, "current_status": current_status, } service_list.append(svc_data) group = s.group_name or "Services" if group not in services_by_group: services_by_group[group] = [] services_by_group[group].append(svc_data) # Get recent unresolved + recently resolved incidents result = await db.execute( select(Incident).order_by(Incident.started_at.desc()).limit(20) ) incidents = result.scalars().all() incident_list = [ { "id": str(i.id), "title": i.title, "status": i.status, "severity": i.severity, "started_at": i.started_at.isoformat() if i.started_at else None, "resolved_at": i.resolved_at.isoformat() if i.resolved_at else None, "service_id": str(i.service_id), } for i in incidents ] # Check for active (unresolved) incidents has_active = any(i["status"] != "resolved" for i in incident_list) return templates.TemplateResponse( "status.html", { "request": request, "site_name": settings.site_name, "services_by_group": services_by_group, "incidents": incident_list, "has_active_incidents": has_active, "now": datetime.utcnow(), }, ) @app.get("/incident/{incident_id}") async def incident_detail_page(request: Request, incident_id: str): """Public incident detail page with timeline of updates.""" from sqlalchemy import select from sqlalchemy.orm import selectinload from app.database import async_session_factory from app.models.models import Incident, IncidentUpdate async with async_session_factory() as db: result = await db.execute( select(Incident) .options(selectinload(Incident.updates)) .where(Incident.id == incident_id) ) incident = result.scalar_one_or_none() if not incident: from fastapi.responses import HTMLResponse return HTMLResponse("