from datetime import datetime from fastapi import FastAPI, Query from fastapi.exceptions import HTTPException from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel from model import Log, SessionLocal, Script, Settings, Subscription, Notification from run_scripts import run_scripts, update_requirements, update_environment import uvicorn app = FastAPI() # Update cors app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Define Pydantic models class ScriptBase(BaseModel): name: str script_content: str class ScriptCreate(ScriptBase): pass class ScriptUpdate(ScriptBase): enabled: bool class ScriptResponse(ScriptBase): id: int created_at: datetime enabled: bool model_config = {"from_attributes": True} class ScriptLogCreate(BaseModel): message: str error_code: int error_message: str @app.get("/") def hello(): return {"message": "Welcome to the Project Monitor API"} class SubscriptionCreate(BaseModel): topic: str class SubscriptionResponse(BaseModel): id: int topic: str created_at: datetime has_unread: bool model_config = {"from_attributes": True} # Subscriptions API Endpoints @app.get("/subscriptions", response_model=list[SubscriptionResponse]) def list_subscriptions(): db = SessionLocal() subscriptions = db.query(Subscription).all() # TODO: find a better way to do this for subscription in subscriptions: not_viewed_count = ( db.query(Notification) .filter( Notification.subscription_id == subscription.id, ~Notification.viewed, ) .count() ) subscription.has_unread = not_viewed_count > 0 db.close() return subscriptions @app.get("/subscriptions/{subscription_id}", response_model=SubscriptionResponse) def get_subscription(subscription_id: int): db = SessionLocal() subscription = ( db.query(Subscription).filter(Subscription.id == subscription_id).first() ) if not subscription: db.close() raise HTTPException(status_code=404, detail="Subscription not found") # checking if subscription has unread messages subscription.has_unread = ( db.query(Notification) .filter( Notification.subscription_id == subscription_id and not Notification.viewed ) .count() > 0 ) db.close() return subscription @app.post("/subscriptions") def add_subscription(subscription: SubscriptionCreate): db = SessionLocal() existing_subscription = ( db.query(Subscription).filter(Subscription.topic == subscription.topic).first() ) if existing_subscription: db.close() raise HTTPException(status_code=400, detail="Subscription already exists") new_subscription = Subscription(topic=subscription.topic) db.add(new_subscription) db.commit() db.refresh(new_subscription) db.close() return new_subscription @app.delete("/subscriptions/{subscription_id}") def remove_subscription(subscription_id: int): db = SessionLocal() subscription = ( db.query(Subscription).filter(Subscription.id == subscription_id).first() ) if not subscription: db.close() raise HTTPException(status_code=404, detail="Subscription not found") db.delete(subscription) db.commit() db.close() return {"message": "Subscription removed"} @app.get("/subscriptions/{subscription_id}/notifications") def list_subscription_notifications( subscription_id: int, limit: int = Query(20, ge=1, le=100), offset: int = Query(0, ge=0), ): db = SessionLocal() notifications = ( db.query(Notification) .filter(Notification.subscription_id == subscription_id) .order_by(Notification.created_at.desc()) .limit(limit) .offset(offset) .all() ) db.close() return [ NotificationResponse.model_validate(notification) for notification in notifications ] @app.get("/notifications") def list_notifications(): db = SessionLocal() notifications = db.query(Notification).all() db.close() return [ NotificationResponse.model_validate(notification) for notification in notifications ] @app.delete("/notifications/{notification_id}") def remove_notification(notification_id: int): db = SessionLocal() notification = ( db.query(Notification).filter(Notification.id == notification_id).first() ) if not notification: db.close() raise HTTPException(status_code=404, detail="Notification not found") db.delete(notification) db.commit() db.close() return {"message": "Notification removed"} class NotificationCreate(BaseModel): subscription_id: int title: str message: str priority: int class NotificationUpdate(BaseModel): subscription_id: int | None = None title: str | None = None message: str | None = None priority: int | None = None viewed: bool | None = None class NotificationResponse(NotificationCreate): id: int created_at: datetime viewed: bool model_config = {"from_attributes": True} @app.put("/notifications/{notification_id}", response_model=NotificationResponse) def update_notification(notification_id: int, notification: NotificationUpdate): db = SessionLocal() existing_notification = ( db.query(Notification).filter(Notification.id == notification_id).first() ) if not existing_notification: db.close() raise HTTPException(status_code=404, detail="Notification not found") if notification.subscription_id is not None: existing_notification.subscription_id = notification.subscription_id if notification.title is not None: existing_notification.title = notification.title if notification.message is not None: existing_notification.message = notification.message if notification.priority is not None: existing_notification.priority = notification.priority if notification.viewed is not None: existing_notification.viewed = notification.viewed db.commit() db.refresh(existing_notification) db.close() return existing_notification @app.post("/notifications", response_model=NotificationResponse) def create_notification(notification: NotificationCreate): db = SessionLocal() new_notification = Notification( subscription_id=notification.subscription_id, title=notification.title, message=notification.message, priority=notification.priority, ) db.add(new_notification) db.commit() db.refresh(new_notification) db.close() return new_notification # Define Pydantic models for Settings class SettingsBase(BaseModel): requirements: str environment: str user: str ntfy_url: str class SettingsUpdate(SettingsBase): pass class SettingsResponse(SettingsBase): id: int model_config = {"from_attributes": True} # Settings API Endpoints @app.get("/settings", response_model=list[SettingsResponse]) def read_settings(): db = SessionLocal() settings = db.query(Settings).all() db.close() return settings @app.post("/settings", response_model=SettingsResponse) def create_setting(settings: SettingsBase): db = SessionLocal() new_setting = Settings(**settings.model_dump()) db.add(new_setting) db.commit() db.refresh(new_setting) db.close() return new_setting @app.get("/settings/{settings_id}", response_model=SettingsResponse) def read_setting(settings_id: int): db = SessionLocal() setting = db.query(Settings).filter(Settings.id == settings_id).first() db.close() if not setting: raise HTTPException(status_code=404, detail="Setting not found") return setting @app.put("/settings/{settings_id}", response_model=SettingsResponse) def update_setting(settings_id: int, settings: SettingsUpdate): db = SessionLocal() existing_setting = db.query(Settings).filter(Settings.id == settings_id).first() if not existing_setting: raise HTTPException(status_code=404, detail="Setting not found") if settings.requirements and existing_setting.requirements != settings.requirements: existing_setting.requirements = settings.requirements update_requirements(settings) if settings.environment and existing_setting.environment != settings.environment: existing_setting.environment = settings.environment update_environment(settings) if settings.ntfy_url is not None: existing_setting.ntfy_url = settings.ntfy_url db.commit() db.refresh(existing_setting) db.close() return existing_setting @app.get("/script", response_model=list[ScriptResponse]) def read_scripts(): db = SessionLocal() scripts = db.query(Script).all() db.close() return scripts @app.post("/script", response_model=ScriptResponse) def create_script(script: ScriptCreate): db = SessionLocal() new_script = Script(name=script.name, script_content=script.script_content) db.add(new_script) db.commit() db.refresh(new_script) db.close() return new_script @app.get("/script/{script_id}", response_model=ScriptResponse) def read_script(script_id: int): db = SessionLocal() script = db.query(Script).filter(Script.id == script_id).first() db.close() if not script: raise HTTPException(status_code=404, detail="Script not found") return script @app.delete("/script/{script_id}") def delete_script(script_id: int): db = SessionLocal() script = db.query(Script).filter(Script.id == script_id).first() if not script: raise HTTPException(status_code=404, detail="Script not found") db.delete(script) logs = db.query(Log).filter(Log.script_id == script_id).all() for log in logs: db.delete(log) db.commit() db.close() return {"message": "Script deleted"} @app.put("/script/{script_id}", response_model=ScriptResponse) def update_script(script_id: int, script: ScriptUpdate): db = SessionLocal() existing_script = db.query(Script).filter(Script.id == script_id).first() if not existing_script: raise HTTPException(status_code=404, detail="Script not found") existing_script.name = script.name existing_script.script_content = script.script_content existing_script.enabled = script.enabled db.commit() db.refresh(existing_script) db.close() return existing_script @app.get("/script/{script_id}/log") def get_script_logs(script_id: int): db = SessionLocal() logs = db.query(Log).filter(Log.script_id == script_id).all() db.close() return logs @app.post("/script/{script_id}/log") def create_script_log(script_id: int, log: ScriptLogCreate): db = SessionLocal() new_log = Log( script_id=script_id, message=log.message, error_code=log.error_code, error_message=log.error_message, ) db.add(new_log) db.commit() db.refresh(new_log) db.close() return new_log @app.delete("/script/{script_id}/log/{log_id}") def delete_script_log(script_id: int, log_id: int): db = SessionLocal() log = db.query(Log).filter(Log.id == log_id and Log.script_id == script_id).first() if not log: raise HTTPException(status_code=404, detail="Log not found") db.delete(log) db.commit() db.close() return {"message": "Log deleted"} @app.post("/script/{script_id}/execute") def execute_script(script_id: int): run_scripts([script_id]) return {"run_script": True} @app.get("/health") def health_check(): return {"status": "healthy"} if __name__ == "__main__": uvicorn.run(app, host="0.0.0.0", port=8000)