Files
Project-Monitor/backend/backend.py
2025-11-01 17:05:10 +01:00

602 lines
17 KiB
Python

from datetime import datetime
from fastapi import FastAPI, Depends, HTTPException, status, Query
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from model import Log, SessionLocal, Script, Settings, Subscription, Notification, User
from run_scripts import run_scripts, update_requirements, update_environment
import uvicorn
from passlib.context import CryptContext
import os
from model import ensure_default_setting
from auth import (
get_password_hash,
create_access_token,
authenticate_user,
get_current_user,
)
app = FastAPI()
# JWT settings
SECRET_KEY = os.getenv("SECRET_KEY", "")
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 60
if SECRET_KEY == "":
raise ValueError("SECRET_KEY environment variable is not set")
pwd_context = CryptContext(schemes=["argon2"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="login")
ensure_default_setting()
# Update cors
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# User registration/login models
class UserCreate(BaseModel):
username: str
password: str
class Token(BaseModel):
access_token: str
token_type: str
# Define Pydantic models
class ScriptBase(BaseModel):
name: str
script_content: str
class ScriptCreate(ScriptBase):
pass
class ScriptUpdate(ScriptBase):
enabled: bool
class ScriptResponse(ScriptBase):
id: int
created_at: datetime
enabled: bool
model_config = {"from_attributes": True}
class ScriptLogCreate(BaseModel):
message: str
error_code: int
error_message: str
@app.get("/")
def hello():
return {"message": "Welcome to the Project Monitor API"}
@app.post("/register", response_model=Token)
def register(user: UserCreate):
db = SessionLocal()
existing_user = db.query(User).filter(User.username == user.username).first()
if existing_user:
db.close()
raise HTTPException(status_code=400, detail="Username already registered")
hashed_password = get_password_hash(user.password)
new_user = User(username=user.username, password_hash=hashed_password)
db.add(new_user)
db.commit()
db.refresh(new_user)
access_token = create_access_token(data={"sub": new_user.username})
db.close()
return {"access_token": access_token, "token_type": "bearer"}
@app.post("/login", response_model=Token)
def login(form_data: OAuth2PasswordRequestForm = Depends()):
db = SessionLocal()
user = authenticate_user(db, form_data.username, form_data.password)
if not user:
db.close()
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token = create_access_token(data={"sub": user.username})
db.close()
return {"access_token": access_token, "token_type": "bearer"}
class SubscriptionCreate(BaseModel):
topic: str
class SubscriptionResponse(BaseModel):
id: int
topic: str
created_at: datetime
has_unread: bool
model_config = {"from_attributes": True}
# Subscriptions API Endpoints
@app.get("/subscriptions", response_model=list[SubscriptionResponse])
def list_subscriptions(current_user: User = Depends(get_current_user)):
db = SessionLocal()
subscriptions = (
db.query(Subscription).filter(Subscription.user_id == current_user.id).all()
)
# TODO: find a better way to do this
for subscription in subscriptions:
not_viewed_count = (
db.query(Notification)
.filter(
Notification.subscription_id == subscription.id,
~Notification.viewed,
)
.count()
)
subscription.has_unread = not_viewed_count > 0
db.close()
return subscriptions
@app.get("/subscriptions/{subscription_id}", response_model=SubscriptionResponse)
def get_subscription(
subscription_id: int, current_user: User = Depends(get_current_user)
):
db = SessionLocal()
subscription = (
db.query(Subscription).filter(Subscription.id == subscription_id).first()
)
if not subscription:
db.close()
raise HTTPException(status_code=404, detail="Subscription not found")
# checking if subscription has unread messages
subscription.has_unread = (
db.query(Notification)
.filter(
Notification.subscription_id == subscription_id and not Notification.viewed
)
.count()
> 0
)
db.close()
return subscription
@app.post("/subscriptions")
def add_subscription(
subscription: SubscriptionCreate, current_user: User = Depends(get_current_user)
):
db = SessionLocal()
existing_subscription = (
db.query(Subscription).filter(Subscription.topic == subscription.topic).first()
)
if existing_subscription:
db.close()
raise HTTPException(status_code=400, detail="Subscription already exists")
new_subscription = Subscription(topic=subscription.topic, user_id=current_user.id)
db.add(new_subscription)
db.commit()
db.refresh(new_subscription)
db.close()
return new_subscription
@app.delete("/subscriptions/{subscription_id}")
def remove_subscription(
subscription_id: int, current_user: User = Depends(get_current_user)
):
db = SessionLocal()
subscription = (
db.query(Subscription).filter(Subscription.id == subscription_id).first()
)
if not subscription:
db.close()
raise HTTPException(status_code=404, detail="Subscription not found")
db.delete(subscription)
db.commit()
db.close()
return {"message": "Subscription removed"}
@app.get("/subscriptions/{subscription_id}/notifications")
def list_subscription_notifications(
subscription_id: int,
limit: int = Query(20, ge=1, le=100),
offset: int = Query(0, ge=0),
current_user: User = Depends(get_current_user),
):
db = SessionLocal()
notifications = (
db.query(Notification)
.filter(Notification.subscription_id == subscription_id)
.order_by(Notification.created_at.desc())
.limit(limit)
.offset(offset)
.all()
)
db.close()
return [
NotificationResponse.model_validate(notification)
for notification in notifications
]
@app.post("/subscriptions/{subscription_id}/notifications")
def set_all_notifications_viewed(
subscription_id: int,
current_user: User = Depends(get_current_user),
):
db = SessionLocal()
notifications = (
db.query(Notification)
.filter(Notification.subscription_id == subscription_id)
.all()
)
for notification in notifications:
notification.viewed = True
db.commit()
db.close()
return {"message": "Notifications marked as viewed"}
@app.delete("/subscriptions/{subscription_id}/notifications")
def remove_subscription_notifications(
subscription_id: int, current_user: User = Depends(get_current_user)
):
db = SessionLocal()
notifications = (
db.query(Notification)
.filter(Notification.subscription_id == subscription_id)
.all()
)
for notification in notifications:
db.delete(notification)
db.commit()
db.close()
return {"message": "Notifications removed"}
@app.get("/notifications")
def list_notifications(current_user: User = Depends(get_current_user)):
db = SessionLocal()
notifications = db.query(Notification).all()
db.close()
return [
NotificationResponse.model_validate(notification)
for notification in notifications
]
@app.delete("/notifications/{notification_id}")
def remove_notification(
notification_id: int, current_user: User = Depends(get_current_user)
):
db = SessionLocal()
notification = (
db.query(Notification).filter(Notification.id == notification_id).first()
)
if not notification:
db.close()
raise HTTPException(status_code=404, detail="Notification not found")
db.delete(notification)
db.commit()
db.close()
return {"message": "Notification removed"}
class NotificationCreate(BaseModel):
subscription_id: int
title: str
message: str
priority: int
class NotificationUpdate(BaseModel):
subscription_id: int | None = None
title: str | None = None
message: str | None = None
priority: int | None = None
viewed: bool | None = None
class NotificationResponse(NotificationCreate):
id: int
created_at: datetime
viewed: bool
model_config = {"from_attributes": True}
@app.put("/notifications/{notification_id}", response_model=NotificationResponse)
def update_notification(
notification_id: int,
notification: NotificationUpdate,
current_user: User = Depends(get_current_user),
):
db = SessionLocal()
existing_notification = (
db.query(Notification).filter(Notification.id == notification_id).first()
)
if not existing_notification:
db.close()
raise HTTPException(status_code=404, detail="Notification not found")
if notification.subscription_id is not None:
existing_notification.subscription_id = notification.subscription_id
if notification.title is not None:
existing_notification.title = notification.title
if notification.message is not None:
existing_notification.message = notification.message
if notification.priority is not None:
existing_notification.priority = notification.priority
if notification.viewed is not None:
existing_notification.viewed = notification.viewed
db.commit()
db.refresh(existing_notification)
db.close()
return existing_notification
@app.post("/notifications", response_model=NotificationResponse)
def create_notification(
notification: NotificationCreate, current_user: User = Depends(get_current_user)
):
db = SessionLocal()
new_notification = Notification(
subscription_id=notification.subscription_id,
title=notification.title,
message=notification.message,
priority=notification.priority,
)
db.add(new_notification)
db.commit()
db.refresh(new_notification)
db.close()
return new_notification
# Define Pydantic models for Settings
class SettingsBase(BaseModel):
requirements: str
environment: str
ntfy_url: str
class SettingsUpdate(SettingsBase):
pass
class SettingsResponse(SettingsBase):
id: int
model_config = {"from_attributes": True}
# Settings API Endpoints
@app.get("/settings", response_model=SettingsResponse)
def read_settings(current_user: User = Depends(get_current_user)):
db = SessionLocal()
settings = db.query(Settings).filter(Settings.user_id == current_user.id).all()
if not settings:
# Add a default settings row for this user if not found
new_setting = Settings(
requirements="",
environment="",
user_id=current_user.id,
ntfy_url="https://ntfy.abzk.fr",
)
db.add(new_setting)
db.commit()
db.refresh(new_setting)
db.close()
return new_setting
if len(settings) > 1:
raise HTTPException(status_code=400, detail="Multiple settings found")
settings = settings[0]
db.close()
return settings
@app.post("/settings", response_model=SettingsResponse)
def create_setting(
settings: SettingsBase, current_user: User = Depends(get_current_user)
):
db = SessionLocal()
new_setting = Settings(**settings.model_dump(), user_id=current_user.id)
db.add(new_setting)
db.commit()
db.refresh(new_setting)
db.close()
return new_setting
@app.get("/settings/{settings_id}", response_model=SettingsResponse)
def read_setting(settings_id: int, current_user: User = Depends(get_current_user)):
db = SessionLocal()
setting = (
db.query(Settings)
.filter(Settings.id == settings_id, Settings.user_id == current_user.id)
.first()
)
db.close()
if not setting:
raise HTTPException(status_code=404, detail="Setting not found")
return setting
@app.put("/settings/{settings_id}", response_model=SettingsResponse)
def update_setting(
settings_id: int,
settings: SettingsUpdate,
current_user: User = Depends(get_current_user),
):
db = SessionLocal()
existing_setting = (
db.query(Settings)
.filter(Settings.id == settings_id, Settings.user_id == current_user.id)
.first()
)
if not existing_setting:
raise HTTPException(status_code=404, detail="Setting not found")
if settings.requirements and existing_setting.requirements != settings.requirements:
existing_setting.requirements = settings.requirements
update_requirements(settings)
if settings.environment and existing_setting.environment != settings.environment:
existing_setting.environment = settings.environment
update_environment(settings)
if settings.ntfy_url is not None:
existing_setting.ntfy_url = settings.ntfy_url
db.commit()
db.refresh(existing_setting)
db.close()
return existing_setting
@app.get("/script", response_model=list[ScriptResponse])
def read_scripts(current_user: User = Depends(get_current_user)):
db = SessionLocal()
scripts = db.query(Script).filter(Script.user_id == current_user.id).all()
db.close()
return scripts
@app.post("/script", response_model=ScriptResponse)
def create_script(script: ScriptCreate, current_user: User = Depends(get_current_user)):
db = SessionLocal()
new_script = Script(
name=script.name, script_content=script.script_content, user_id=current_user.id
)
db.add(new_script)
db.commit()
db.refresh(new_script)
db.close()
return new_script
@app.get("/script/{script_id}", response_model=ScriptResponse)
def read_script(script_id: int, current_user: User = Depends(get_current_user)):
db = SessionLocal()
script = db.query(Script).filter(Script.id == script_id).first()
db.close()
if not script:
raise HTTPException(status_code=404, detail="Script not found")
return script
@app.delete("/script/{script_id}")
def delete_script(script_id: int, current_user: User = Depends(get_current_user)):
db = SessionLocal()
script = db.query(Script).filter(Script.id == script_id).first()
if not script:
raise HTTPException(status_code=404, detail="Script not found")
db.delete(script)
logs = db.query(Log).filter(Log.script_id == script_id).all()
for log in logs:
db.delete(log)
db.commit()
db.close()
return {"message": "Script deleted"}
@app.put("/script/{script_id}", response_model=ScriptResponse)
def update_script(
script_id: int, script: ScriptUpdate, current_user: User = Depends(get_current_user)
):
db = SessionLocal()
existing_script = db.query(Script).filter(Script.id == script_id).first()
if not existing_script:
raise HTTPException(status_code=404, detail="Script not found")
existing_script.name = script.name
existing_script.script_content = script.script_content
existing_script.enabled = script.enabled
db.commit()
db.refresh(existing_script)
db.close()
return existing_script
@app.get("/script/{script_id}/log")
def get_script_logs(script_id: int, current_user: User = Depends(get_current_user)):
db = SessionLocal()
logs = db.query(Log).filter(Log.script_id == script_id).all()
db.close()
return logs
@app.post("/script/{script_id}/log")
def create_script_log(
script_id: int, log: ScriptLogCreate, current_user: User = Depends(get_current_user)
):
db = SessionLocal()
new_log = Log(
script_id=script_id,
message=log.message,
error_code=log.error_code,
error_message=log.error_message,
)
db.add(new_log)
db.commit()
db.refresh(new_log)
db.close()
return new_log
@app.delete("/script/{script_id}/log/{log_id}")
def delete_script_log(
script_id: int, log_id: int, current_user: User = Depends(get_current_user)
):
db = SessionLocal()
log = db.query(Log).filter(Log.id == log_id and Log.script_id == script_id).first()
if not log:
raise HTTPException(status_code=404, detail="Log not found")
db.delete(log)
db.commit()
db.close()
return {"message": "Log deleted"}
@app.post("/script/{script_id}/execute")
def execute_script(script_id: int, current_user: User = Depends(get_current_user)):
run_scripts([script_id])
return {"run_script": True}
@app.get("/health")
def health_check():
return {"status": "healthy"}
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)