#!/usr/bin/env python3 from telegram import InlineKeyboardButton, InlineKeyboardMarkup, Update from telegram.ext import Application, CommandHandler, CallbackQueryHandler, ContextTypes from uptime_kuma_api import MonitorStatus import api.kuma as kuma import api.torrent as torrent import api.ntfy as ntfy import threading TOKEN = "7396669954:AAH8_I0Y-qg3j_LfbUdRTOLPDKh80NdijMo" AUTHORIZED_USER_ID = 634303772 # Replace with your Telegram user ID NTFY_SERVER = "http://192.168.1.2:54720" NTFY_AUTH_HEADER = "tk_cdmwd6ix255g3qgo4dx3r0gakw4y3" DB_PATH = "subscriptions.db" # --- Command Handlers --- async def menu(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: try: await update.message.reply_text( "Choose an option:", reply_markup=main_menu_keyboard() ) except Exception as e: await update.message.reply_text(f"An error occurred: {e}") # Optionally log the error print(f"Error: {e}") async def info(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: user_id = update.message.from_user.id await update.message.reply_text(f"Your UserID is {user_id}.") async def subscribe(update: Update, context: ContextTypes.DEFAULT_TYPE): n_api = context.bot_data.get("ntfy_api", {}) if update.effective_user.id != AUTHORIZED_USER_ID: return if not context.args: await update.message.reply_text("Usage: /subscribe ") return topic = context.args[0] n_api.add_subscription(topic) await update.message.reply_text(f"Subscribed to topic: {topic}") async def unsubscribe(update: Update, context: ContextTypes.DEFAULT_TYPE): n_api = context.bot_data.get("ntfy_api", {}) if update.effective_user.id != AUTHORIZED_USER_ID: return if not context.args: await update.message.reply_text("Usage: /unsubscribe ") return topic = context.args[0] n_api.remove_subscription(topic) await update.message.reply_text(f"Unsubscribed from topic: {topic}") async def list_subs(update: Update, context: ContextTypes.DEFAULT_TYPE): n_api = context.bot_data.get("ntfy_api", {}) if update.effective_user.id != AUTHORIZED_USER_ID: return topics = n_api.get_subscriptions() if not topics: await update.message.reply_text("You are not subscribed to any topics.") else: await update.message.reply_text("Subscribed topics:\n" + "\n".join(topics)) # --- Menu Definitions --- def main_menu_keyboard(): return InlineKeyboardMarkup( [ [InlineKeyboardButton("Torrents", callback_data="status_downloading")], [InlineKeyboardButton("Status", callback_data="menu_status")], ] ) def torrents_menu_keyboard(): return InlineKeyboardMarkup( [ # First row: Display the status counts for downloading, paused, seeding [InlineKeyboardButton(f"Downloading", callback_data="status_downloading")], [InlineKeyboardButton(f"Active", callback_data="status_active")], [InlineKeyboardButton(f"All", callback_data="status_all")], # Second row: Back button [InlineKeyboardButton("πŸ”™ Back", callback_data="menu_main")], ] ) def status_menu_keyboard(): return InlineKeyboardMarkup( [[InlineKeyboardButton("πŸ”™ Back", callback_data="menu_main")]] ) def format_torrents(torrents): if len(torrents) == 0: return "No torrents." text = "" i = 0 for torrent in torrents: if i > 5: text += "...\n" return text text += f"Name: {torrent['name']}\n" text += f"State: {torrent['state']}\n" text += f"Progress: {torrent['progress']:.2f}%\n" text += f"ETA: {torrent['eta']}\n" text += "-" * 20 + "\n" text += f"- {torrent['name']} - {torrent['progress']} ({torrent['eta']})\n" i += 1 return text # --- Callback Query Handler --- async def handle_menu(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: query = update.callback_query await query.answer() match query.data: case "menu_main": await query.edit_message_text( "Choose an option:", reply_markup=main_menu_keyboard() ) case "status_downloading": t_api = context.bot_data.get("torrent_api", {}) torrents = t_api.get_filtered_torrents("downloading") if len(torrents) == 0: text = "No downloading torrents." else: text = format_torrents(torrents) await query.edit_message_text(text, reply_markup=torrents_menu_keyboard()) case "status_active": t_api = context.bot_data.get("torrent_api", {}) torrents = t_api.get_filtered_torrents("active") if len(torrents) == 0: text = "No active torrents." else: text = format_torrents(torrents) await query.edit_message_text(text, reply_markup=torrents_menu_keyboard()) case "status_all": t_api = context.bot_data.get("torrent_api", {}) torrents = t_api.get_filtered_torrents("all") if len(torrents) == 0: text = "No torrents." else: text = format_torrents(torrents) await query.edit_message_text(text, reply_markup=torrents_menu_keyboard()) case "menu_status": k_api = context.bot_data.get("kuma_api", {}) monitors = k_api.get_status() up_text, down_text, paused_text = "", "", "" for _, monitor in monitors.items(): status = monitor["status"] if status == MonitorStatus.UP: up_text += f" - {monitor['name']}\n" elif status == MonitorStatus.DOWN: down_text += f" - {monitor['name']}\n" else: paused_text += f" - {monitor['name']}\n" status_text = f"πŸ“‘ *Status:*\n\n 🟒 Up:\n{up_text}\nπŸ”΄ Down\n{down_text}\n⏸️ Paused\n{paused_text}" await query.edit_message_text( status_text, reply_markup=status_menu_keyboard() ) case _: await query.edit_message_text( "Unknown option selected.", reply_markup=main_menu_keyboard() ) # --- Main Function --- def main(): print("Starting Jarvis...") # Initiate api kuma_api = kuma.KumaAPI("http://192.168.1.2:36667", "k!PTfyvoIJho9o*gX6F1") torrent_api = torrent.TorrentApi( "http://192.168.1.17:8112", "tMHNjrJr7nhjyhJrYsahi4anq2h6LJ", username="MrZaiko" ) ntfy_api = ntfy.NtfyAPI(DB_PATH, NTFY_SERVER, NTFY_AUTH_HEADER, AUTHORIZED_USER_ID) app = Application.builder().token(TOKEN).build() app.bot_data["kuma_api"] = kuma_api app.bot_data["torrent_api"] = torrent_api app.bot_data["ntfy_api"] = ntfy_api app.add_handler(CommandHandler("menu", menu)) app.add_handler(CallbackQueryHandler(handle_menu)) app.add_handler(CommandHandler("info", info)) app.add_handler(CommandHandler("subscribe", subscribe)) app.add_handler(CommandHandler("unsubscribe", unsubscribe)) app.add_handler(CommandHandler("list", list_subs)) threading.Thread(target=ntfy_api.ntfy_listener, args=(app,), daemon=True).start() print("Bot is running... Press Ctrl+C to stop.") app.run_polling() if __name__ == "__main__": main()