From 88ac296393ecb833b25369cce2436816172d5c13 Mon Sep 17 00:00:00 2001 From: MrZaiko Date: Sun, 20 Apr 2025 00:05:11 +0200 Subject: [PATCH] Add ntfy support --- api/ntfy.py | 124 ++++++++++++++++++++++++++++++++++++++++++++++++++++ main.py | 88 +++++++++++++++++++++++++++++++------ 2 files changed, 198 insertions(+), 14 deletions(-) create mode 100644 api/ntfy.py diff --git a/api/ntfy.py b/api/ntfy.py new file mode 100644 index 0000000..f086663 --- /dev/null +++ b/api/ntfy.py @@ -0,0 +1,124 @@ +#!/usr/bin/env python3 + +import asyncio +import sqlite3 +import threading +import time +import requests +import sseclient +import json + + +class NtfyAPI: + def __init__(self, db_path, server, auth_token, authorized_user_id): + self.server = server + self.headers = { + "Authorization": f"Bearer {auth_token}", + "Accept": "application/json", + } + self.authorized_user_id = authorized_user_id + self.db_path = db_path + self.loop = asyncio.get_event_loop() + + db = sqlite3.connect(self.db_path) + c = db.cursor() + c.execute( + """ + CREATE TABLE IF NOT EXISTS subscriptions ( + topic TEXT PRIMARY KEY + )""" + ) + + db.commit() + db.close() + + def subscribe(self, topic): + url = f"{self.server}/{topic}" + try: + r = requests.get(url, stream=True, headers=self.auth_header) + client = sseclient.SSEClient(r) + for event in client.events(): + if event.data: + text = f"[{topic}] {event.data}" + app.bot.send_message(chat_id=self.authorized_user_id, text=text) + except Exception as e: + print(f"[ERROR] Topic {topic}: {e}") + time.sleep(5) + self.subscribe(topic) # retry + + def get_subscriptions(self): + db = sqlite3.connect(self.db_path) + c = db.cursor() + c.execute("SELECT topic FROM subscriptions") + topics = [row[0] for row in c.fetchall()] + db.close() + return topics + + def add_subscription(self, topic): + db = sqlite3.connect(self.db_path) + c = db.cursor() + c.execute("INSERT OR IGNORE INTO subscriptions (topic) VALUES (?)", (topic,)) + db.commit() + db.close() + + def remove_subscription(self, topic): + db = sqlite3.connect(self.db_path) + c = db.cursor() + c.execute("DELETE FROM subscriptions WHERE topic = ?", (topic,)) + db.commit() + db.close() + + def listen_topic(self, topic, app): + print(f"[INFO] Listening to topic: {topic}") + + url = f"{self.server}/{topic}/json" + try: + with requests.get(url, stream=True, headers=self.headers) as response: + for line in response.iter_lines(): + if line: + try: + message = json.loads(line.decode("utf-8")) + text = message.get("message", "") + title = message.get("title", "") + + if text: + text = f"[{topic}]\n{title}\n{text}\n" + text += "-" * 40 + "\n" + text += "/menu\n" + + asyncio.run_coroutine_threadsafe( + app.bot.send_message( + chat_id=self.authorized_user_id, + text=text, + ), + self.loop, + ) + except json.JSONDecodeError as e: + print(f"[ERROR] Failed to parse JSON: {e}") + except Exception as e: + print(f"[ERROR] Topic {topic}: {e}") + + print(f"[INFO] Stopped listening to topic: {topic}") + + # === NTFY LISTENER THREAD === + def ntfy_listener(self, app): + print("[INFO] Starting NTFY listener thread...") + + active_topics = {} + + while True: + topics = set(self.get_subscriptions()) + for topic in topics: + if topic not in active_topics: + print(f"[INFO] Starting thread for topic: {topic}") + t = threading.Thread( + target=self.listen_topic, + args=( + topic, + app, + ), + daemon=True, + ) + t.start() + active_topics[topic] = t + time.sleep(10) # refresh check diff --git a/main.py b/main.py index 789a665..838446a 100644 --- a/main.py +++ b/main.py @@ -5,10 +5,72 @@ from telegram.ext import Application, CommandHandler, CallbackQueryHandler, Cont from uptime_kuma_api import MonitorStatus import api.kuma as kuma - import api.torrent as torrent +import api.ntfy as ntfy + +import threading + TOKEN = "7396669954:AAH8_I0Y-qg3j_LfbUdRTOLPDKh80NdijMo" +AUTHORIZED_USER_ID = 634303772 # Replace with your Telegram user ID +NTFY_SERVER = "http://192.168.1.2:54720" +NTFY_AUTH_HEADER = "tk_cdmwd6ix255g3qgo4dx3r0gakw4y3" +DB_PATH = "subscriptions.db" + + +# --- Command Handlers --- +async def menu(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: + try: + await update.message.reply_text( + "Choose an option:", reply_markup=main_menu_keyboard() + ) + except Exception as e: + await update.message.reply_text(f"An error occurred: {e}") + # Optionally log the error + print(f"Error: {e}") + + +async def info(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: + user_id = update.message.from_user.id + await update.message.reply_text(f"Your UserID is {user_id}.") + + +async def subscribe(update: Update, context: ContextTypes.DEFAULT_TYPE): + n_api = context.bot_data.get("ntfy_api", {}) + + if update.effective_user.id != AUTHORIZED_USER_ID: + return + if not context.args: + await update.message.reply_text("Usage: /subscribe ") + return + topic = context.args[0] + n_api.add_subscription(topic) + await update.message.reply_text(f"Subscribed to topic: {topic}") + + +async def unsubscribe(update: Update, context: ContextTypes.DEFAULT_TYPE): + n_api = context.bot_data.get("ntfy_api", {}) + + if update.effective_user.id != AUTHORIZED_USER_ID: + return + if not context.args: + await update.message.reply_text("Usage: /unsubscribe ") + return + topic = context.args[0] + n_api.remove_subscription(topic) + await update.message.reply_text(f"Unsubscribed from topic: {topic}") + + +async def list_subs(update: Update, context: ContextTypes.DEFAULT_TYPE): + n_api = context.bot_data.get("ntfy_api", {}) + + if update.effective_user.id != AUTHORIZED_USER_ID: + return + topics = n_api.get_subscriptions() + if not topics: + await update.message.reply_text("You are not subscribed to any topics.") + else: + await update.message.reply_text("Subscribed topics:\n" + "\n".join(topics)) # --- Menu Definitions --- @@ -40,18 +102,6 @@ def status_menu_keyboard(): ) -# --- Command Handlers --- -async def start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: - try: - await update.message.reply_text( - "Choose an option:", reply_markup=main_menu_keyboard() - ) - except Exception as e: - await update.message.reply_text(f"An error occurred: {e}") - # Optionally log the error - print(f"Error: {e}") - - def format_torrents(torrents): if len(torrents) == 0: return "No torrents." @@ -157,13 +207,23 @@ def main(): "http://192.168.1.17:8112", "tMHNjrJr7nhjyhJrYsahi4anq2h6LJ" ) + ntfy_api = ntfy.NtfyAPI(DB_PATH, NTFY_SERVER, NTFY_AUTH_HEADER, AUTHORIZED_USER_ID) + app = Application.builder().token(TOKEN).build() app.bot_data["kuma_api"] = kuma_api app.bot_data["torrent_api"] = torrent_api + app.bot_data["ntfy_api"] = ntfy_api - app.add_handler(CommandHandler("start", start)) + app.add_handler(CommandHandler("menu", menu)) app.add_handler(CallbackQueryHandler(handle_menu)) + app.add_handler(CommandHandler("info", info)) + app.add_handler(CommandHandler("subscribe", subscribe)) + app.add_handler(CommandHandler("unsubscribe", unsubscribe)) + app.add_handler(CommandHandler("list", list_subs)) + + threading.Thread(target=ntfy_api.ntfy_listener, args=(app,), daemon=True).start() + print("Bot is running... Press Ctrl+C to stop.") app.run_polling()