Files
Project-Monitor/backend/backend.py
2025-10-12 10:22:07 +02:00

235 lines
5.8 KiB
Python

from datetime import datetime
from fastapi import FastAPI
from fastapi.exceptions import HTTPException
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from model import Log, SessionLocal, Script, Settings
from run_scripts import run_scripts, update_requirements, update_environment
import uvicorn
app = FastAPI()
# Update cors
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Define Pydantic models
class ScriptBase(BaseModel):
name: str
script_content: str
class ScriptCreate(ScriptBase):
pass
class ScriptUpdate(ScriptBase):
enabled: bool
class ScriptResponse(ScriptBase):
id: int
created_at: datetime
enabled: bool
model_config = {"from_attributes": True}
class ScriptLogCreate(BaseModel):
message: str
error_code: int
error_message: str
@app.get("/")
def hello():
return {"message": "Welcome to the Project Monitor API"}
# Define Pydantic models for Settings
class SettingsBase(BaseModel):
requirements: str
environment: str
user: str
class SettingsUpdate(SettingsBase):
pass
class SettingsResponse(SettingsBase):
id: int
model_config = {"from_attributes": True}
# Settings API Endpoints
@app.get("/settings", response_model=list[SettingsResponse])
def read_settings():
db = SessionLocal()
settings = db.query(Settings).all()
db.close()
return settings
@app.post("/settings", response_model=SettingsResponse)
def create_setting(settings: SettingsBase):
db = SessionLocal()
new_setting = Settings(**settings.model_dump())
db.add(new_setting)
db.commit()
db.refresh(new_setting)
db.close()
return new_setting
@app.get("/settings/{settings_id}", response_model=SettingsResponse)
def read_setting(settings_id: int):
db = SessionLocal()
setting = db.query(Settings).filter(Settings.id == settings_id).first()
db.close()
if not setting:
raise HTTPException(status_code=404, detail="Setting not found")
return setting
@app.put("/settings/{settings_id}", response_model=SettingsResponse)
def update_setting(settings_id: int, settings: SettingsUpdate):
db = SessionLocal()
existing_setting = db.query(Settings).filter(Settings.id == settings_id).first()
if not existing_setting:
raise HTTPException(status_code=404, detail="Setting not found")
if existing_setting.requirements != settings.requirements:
existing_setting.requirements = settings.requirements
update_requirements(settings)
if existing_setting.environment != settings.environment:
existing_setting.environment = settings.environment
update_environment(settings)
db.commit()
db.refresh(existing_setting)
db.close()
return existing_setting
@app.get("/script", response_model=list[ScriptResponse])
def read_scripts():
db = SessionLocal()
scripts = db.query(Script).all()
db.close()
return scripts
@app.post("/script", response_model=ScriptResponse)
def create_script(script: ScriptCreate):
db = SessionLocal()
new_script = Script(name=script.name, script_content=script.script_content)
db.add(new_script)
db.commit()
db.refresh(new_script)
db.close()
return new_script
@app.get("/script/{script_id}", response_model=ScriptResponse)
def read_script(script_id: int):
db = SessionLocal()
script = db.query(Script).filter(Script.id == script_id).first()
db.close()
if not script:
raise HTTPException(status_code=404, detail="Script not found")
return script
@app.delete("/script/{script_id}")
def delete_script(script_id: int):
db = SessionLocal()
script = db.query(Script).filter(Script.id == script_id).first()
if not script:
raise HTTPException(status_code=404, detail="Script not found")
db.delete(script)
logs = db.query(Log).filter(Log.script_id == script_id).all()
for log in logs:
db.delete(log)
db.commit()
db.close()
return {"message": "Script deleted"}
@app.put("/script/{script_id}", response_model=ScriptResponse)
def update_script(script_id: int, script: ScriptUpdate):
db = SessionLocal()
existing_script = db.query(Script).filter(Script.id == script_id).first()
if not existing_script:
raise HTTPException(status_code=404, detail="Script not found")
existing_script.name = script.name
existing_script.script_content = script.script_content
existing_script.enabled = script.enabled
db.commit()
db.refresh(existing_script)
db.close()
return existing_script
@app.get("/script/{script_id}/log")
def get_script_logs(script_id: int):
db = SessionLocal()
logs = db.query(Log).filter(Log.script_id == script_id).all()
db.close()
return logs
@app.post("/script/{script_id}/log")
def create_script_log(script_id: int, log: ScriptLogCreate):
db = SessionLocal()
new_log = Log(
script_id=script_id,
message=log.message,
error_code=log.error_code,
error_message=log.error_message,
)
db.add(new_log)
db.commit()
db.refresh(new_log)
db.close()
return new_log
@app.delete("/script/{script_id}/log/{log_id}")
def delete_script_log(script_id: int, log_id: int):
db = SessionLocal()
log = db.query(Log).filter(Log.id == log_id and Log.script_id == script_id).first()
if not log:
raise HTTPException(status_code=404, detail="Log not found")
db.delete(log)
db.commit()
db.close()
return {"message": "Log deleted"}
@app.post("/script/{script_id}/execute")
def execute_script(script_id: int):
run_scripts([script_id])
return {"run_script": True}
@app.get("/health")
def health_check():
return {"status": "healthy"}
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)