#!/usr/bin/env python3 import asyncio import sqlite3 import threading import time import requests import json class NtfyAPI: def __init__(self, db_path, server, auth_token, authorized_user_id): self.server = server self.headers = { "Authorization": f"Bearer {auth_token}", "Accept": "application/json", } self.authorized_user_id = authorized_user_id self.db_path = db_path self.loop = asyncio.get_event_loop() db = sqlite3.connect(self.db_path) c = db.cursor() c.execute( """ CREATE TABLE IF NOT EXISTS subscriptions ( topic TEXT PRIMARY KEY )""" ) db.commit() db.close() def subscribe(self, topic): url = f"{self.server}/{topic}" try: r = requests.get(url, stream=True, headers=self.auth_header) client = sseclient.SSEClient(r) for event in client.events(): if event.data: text = f"[{topic}] {event.data}" app.bot.send_message(chat_id=self.authorized_user_id, text=text) except Exception as e: print(f"[ERROR] Topic {topic}: {e}") time.sleep(5) self.subscribe(topic) # retry def get_subscriptions(self): db = sqlite3.connect(self.db_path) c = db.cursor() c.execute("SELECT topic FROM subscriptions") topics = [row[0] for row in c.fetchall()] db.close() return topics def add_subscription(self, topic): db = sqlite3.connect(self.db_path) c = db.cursor() c.execute("INSERT OR IGNORE INTO subscriptions (topic) VALUES (?)", (topic,)) db.commit() db.close() def remove_subscription(self, topic): db = sqlite3.connect(self.db_path) c = db.cursor() c.execute("DELETE FROM subscriptions WHERE topic = ?", (topic,)) db.commit() db.close() def listen_topic(self, topic, app): print(f"[INFO] Listening to topic: {topic}") url = f"{self.server}/{topic}/json" try: with requests.get(url, stream=True, headers=self.headers) as response: for line in response.iter_lines(): if line: try: message = json.loads(line.decode("utf-8")) text = message.get("message", "") title = message.get("title", "") if text: text = f"[{topic}]\n{title}\n{text}\n" text += "-" * 40 + "\n" text += "/menu\n" asyncio.run_coroutine_threadsafe( app.bot.send_message( chat_id=self.authorized_user_id, text=text, ), self.loop, ) except json.JSONDecodeError as e: print(f"[ERROR] Failed to parse JSON: {e}") except Exception as e: print(f"[ERROR] Topic {topic}: {e}") print(f"[INFO] Stopped listening to topic: {topic}") # === NTFY LISTENER THREAD === def ntfy_listener(self, app): print("[INFO] Starting NTFY listener thread...") active_topics = {} while True: topics = set(self.get_subscriptions()) for topic in topics: if topic not in active_topics: print(f"[INFO] Starting thread for topic: {topic}") t = threading.Thread( target=self.listen_topic, args=( topic, app, ), daemon=True, ) t.start() active_topics[topic] = t time.sleep(10) # refresh check