import model import subprocess import os def run_scripts(script_ids: list[int] | None = None): db = model.SessionLocal() if script_ids: scripts = db.query(model.Script).filter(model.Script.id.in_(script_ids)).all() else: scripts = db.query(model.Script).filter(model.Script.enabled).all() for script in scripts: print(f"Running script: {script.name}") dump_script_to_file(script, f"exec_folder/{script.name}.py") result = execute_script(f"exec_folder/{script.name}.py") db.add( model.Log( script_id=script.id, error_code=result.returncode, message=result.stdout, error_message=result.stderr, ) ) db.commit() delete_script(f"exec_folder/{script.name}.py") db.close() def dump_script_to_file(script, filename): with open(filename, "w") as file: file.write(script.script_content) def execute_script(filename) -> subprocess.CompletedProcess: result = subprocess.run(["python", filename], capture_output=True, text=True) return result def delete_script(filename): try: os.remove(filename) except FileNotFoundError: pass def main(): run_scripts() if __name__ == "__main__": main()