124 lines
4.1 KiB
Python
124 lines
4.1 KiB
Python
#!/usr/bin/env python3
|
|
|
|
import asyncio
|
|
import sqlite3
|
|
import threading
|
|
import time
|
|
import requests
|
|
import json
|
|
|
|
|
|
class NtfyAPI:
|
|
def __init__(self, db_path, server, auth_token, authorized_user_id):
|
|
self.server = server
|
|
self.headers = {
|
|
"Authorization": f"Bearer {auth_token}",
|
|
"Accept": "application/json",
|
|
}
|
|
self.authorized_user_id = authorized_user_id
|
|
self.db_path = db_path
|
|
self.loop = asyncio.get_event_loop()
|
|
|
|
db = sqlite3.connect(self.db_path)
|
|
c = db.cursor()
|
|
c.execute(
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS subscriptions (
|
|
topic TEXT PRIMARY KEY
|
|
)"""
|
|
)
|
|
|
|
db.commit()
|
|
db.close()
|
|
|
|
def subscribe(self, topic):
|
|
url = f"{self.server}/{topic}"
|
|
try:
|
|
r = requests.get(url, stream=True, headers=self.auth_header)
|
|
client = sseclient.SSEClient(r)
|
|
for event in client.events():
|
|
if event.data:
|
|
text = f"[{topic}] {event.data}"
|
|
app.bot.send_message(chat_id=self.authorized_user_id, text=text)
|
|
except Exception as e:
|
|
print(f"[ERROR] Topic {topic}: {e}")
|
|
time.sleep(5)
|
|
self.subscribe(topic) # retry
|
|
|
|
def get_subscriptions(self):
|
|
db = sqlite3.connect(self.db_path)
|
|
c = db.cursor()
|
|
c.execute("SELECT topic FROM subscriptions")
|
|
topics = [row[0] for row in c.fetchall()]
|
|
db.close()
|
|
return topics
|
|
|
|
def add_subscription(self, topic):
|
|
db = sqlite3.connect(self.db_path)
|
|
c = db.cursor()
|
|
c.execute("INSERT OR IGNORE INTO subscriptions (topic) VALUES (?)", (topic,))
|
|
db.commit()
|
|
db.close()
|
|
|
|
def remove_subscription(self, topic):
|
|
db = sqlite3.connect(self.db_path)
|
|
c = db.cursor()
|
|
c.execute("DELETE FROM subscriptions WHERE topic = ?", (topic,))
|
|
db.commit()
|
|
db.close()
|
|
|
|
def listen_topic(self, topic, app):
|
|
print(f"[INFO] Listening to topic: {topic}")
|
|
|
|
url = f"{self.server}/{topic}/json"
|
|
try:
|
|
with requests.get(url, stream=True, headers=self.headers) as response:
|
|
for line in response.iter_lines():
|
|
if line:
|
|
try:
|
|
message = json.loads(line.decode("utf-8"))
|
|
text = message.get("message", "")
|
|
title = message.get("title", "")
|
|
|
|
if text:
|
|
text = f"[{topic}]\n{title}\n{text}\n"
|
|
text += "-" * 40 + "\n"
|
|
text += "/menu\n"
|
|
|
|
asyncio.run_coroutine_threadsafe(
|
|
app.bot.send_message(
|
|
chat_id=self.authorized_user_id,
|
|
text=text,
|
|
),
|
|
self.loop,
|
|
)
|
|
except json.JSONDecodeError as e:
|
|
print(f"[ERROR] Failed to parse JSON: {e}")
|
|
except Exception as e:
|
|
print(f"[ERROR] Topic {topic}: {e}")
|
|
|
|
print(f"[INFO] Stopped listening to topic: {topic}")
|
|
|
|
# === NTFY LISTENER THREAD ===
|
|
def ntfy_listener(self, app):
|
|
print("[INFO] Starting NTFY listener thread...")
|
|
|
|
active_topics = {}
|
|
|
|
while True:
|
|
topics = set(self.get_subscriptions())
|
|
for topic in topics:
|
|
if topic not in active_topics:
|
|
print(f"[INFO] Starting thread for topic: {topic}")
|
|
t = threading.Thread(
|
|
target=self.listen_topic,
|
|
args=(
|
|
topic,
|
|
app,
|
|
),
|
|
daemon=True,
|
|
)
|
|
t.start()
|
|
active_topics[topic] = t
|
|
time.sleep(10) # refresh check
|