This commit is contained in:
124
api/ntfy.py
Normal file
124
api/ntfy.py
Normal file
@@ -0,0 +1,124 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
import asyncio
|
||||
import sqlite3
|
||||
import threading
|
||||
import time
|
||||
import requests
|
||||
import sseclient
|
||||
import json
|
||||
|
||||
|
||||
class NtfyAPI:
|
||||
def __init__(self, db_path, server, auth_token, authorized_user_id):
|
||||
self.server = server
|
||||
self.headers = {
|
||||
"Authorization": f"Bearer {auth_token}",
|
||||
"Accept": "application/json",
|
||||
}
|
||||
self.authorized_user_id = authorized_user_id
|
||||
self.db_path = db_path
|
||||
self.loop = asyncio.get_event_loop()
|
||||
|
||||
db = sqlite3.connect(self.db_path)
|
||||
c = db.cursor()
|
||||
c.execute(
|
||||
"""
|
||||
CREATE TABLE IF NOT EXISTS subscriptions (
|
||||
topic TEXT PRIMARY KEY
|
||||
)"""
|
||||
)
|
||||
|
||||
db.commit()
|
||||
db.close()
|
||||
|
||||
def subscribe(self, topic):
|
||||
url = f"{self.server}/{topic}"
|
||||
try:
|
||||
r = requests.get(url, stream=True, headers=self.auth_header)
|
||||
client = sseclient.SSEClient(r)
|
||||
for event in client.events():
|
||||
if event.data:
|
||||
text = f"[{topic}] {event.data}"
|
||||
app.bot.send_message(chat_id=self.authorized_user_id, text=text)
|
||||
except Exception as e:
|
||||
print(f"[ERROR] Topic {topic}: {e}")
|
||||
time.sleep(5)
|
||||
self.subscribe(topic) # retry
|
||||
|
||||
def get_subscriptions(self):
|
||||
db = sqlite3.connect(self.db_path)
|
||||
c = db.cursor()
|
||||
c.execute("SELECT topic FROM subscriptions")
|
||||
topics = [row[0] for row in c.fetchall()]
|
||||
db.close()
|
||||
return topics
|
||||
|
||||
def add_subscription(self, topic):
|
||||
db = sqlite3.connect(self.db_path)
|
||||
c = db.cursor()
|
||||
c.execute("INSERT OR IGNORE INTO subscriptions (topic) VALUES (?)", (topic,))
|
||||
db.commit()
|
||||
db.close()
|
||||
|
||||
def remove_subscription(self, topic):
|
||||
db = sqlite3.connect(self.db_path)
|
||||
c = db.cursor()
|
||||
c.execute("DELETE FROM subscriptions WHERE topic = ?", (topic,))
|
||||
db.commit()
|
||||
db.close()
|
||||
|
||||
def listen_topic(self, topic, app):
|
||||
print(f"[INFO] Listening to topic: {topic}")
|
||||
|
||||
url = f"{self.server}/{topic}/json"
|
||||
try:
|
||||
with requests.get(url, stream=True, headers=self.headers) as response:
|
||||
for line in response.iter_lines():
|
||||
if line:
|
||||
try:
|
||||
message = json.loads(line.decode("utf-8"))
|
||||
text = message.get("message", "")
|
||||
title = message.get("title", "")
|
||||
|
||||
if text:
|
||||
text = f"[{topic}]\n{title}\n{text}\n"
|
||||
text += "-" * 40 + "\n"
|
||||
text += "/menu\n"
|
||||
|
||||
asyncio.run_coroutine_threadsafe(
|
||||
app.bot.send_message(
|
||||
chat_id=self.authorized_user_id,
|
||||
text=text,
|
||||
),
|
||||
self.loop,
|
||||
)
|
||||
except json.JSONDecodeError as e:
|
||||
print(f"[ERROR] Failed to parse JSON: {e}")
|
||||
except Exception as e:
|
||||
print(f"[ERROR] Topic {topic}: {e}")
|
||||
|
||||
print(f"[INFO] Stopped listening to topic: {topic}")
|
||||
|
||||
# === NTFY LISTENER THREAD ===
|
||||
def ntfy_listener(self, app):
|
||||
print("[INFO] Starting NTFY listener thread...")
|
||||
|
||||
active_topics = {}
|
||||
|
||||
while True:
|
||||
topics = set(self.get_subscriptions())
|
||||
for topic in topics:
|
||||
if topic not in active_topics:
|
||||
print(f"[INFO] Starting thread for topic: {topic}")
|
||||
t = threading.Thread(
|
||||
target=self.listen_topic,
|
||||
args=(
|
||||
topic,
|
||||
app,
|
||||
),
|
||||
daemon=True,
|
||||
)
|
||||
t.start()
|
||||
active_topics[topic] = t
|
||||
time.sleep(10) # refresh check
|
||||
Reference in New Issue
Block a user