import os import requests from datetime import datetime from model import SessionLocal, Subscription, Settings, Notification import json # Constants NTFY_TOKEN = os.getenv("NTFY_TOKEN") def fetch_ntfy_notifications(base_url, subscriptions): """Fetch notifications from the ntfy.sh server for the given subscriptions using streaming.""" headers = { "Accept": "application/json", "Authorization": f"Bearer {NTFY_TOKEN}" if NTFY_TOKEN else None, } notifications = [] for subscription in subscriptions: print(f"Fetching notifications for {subscription.topic}") topic = subscription.topic last_message_id = subscription.last_message_id since_param = "all" if last_message_id is None else last_message_id url = f"{base_url}/{topic}/json?poll=1&since={since_param}" response = requests.get(url, headers=headers, stream=True) response.raise_for_status() for line in response.iter_lines(): if line: notification = json.loads(line) if notification.get("event") == "message": notifications.append(notification) print(f"Fetched {len(notifications)} notifications") print(notifications) return notifications def save_notifications_to_db(notifications, topic_to_subscription, db): """Save the fetched notifications to the database and update last_message_id.""" db = SessionLocal() last_message_ids = {} for notification in notifications: topic = notification["topic"] last_message_ids[topic] = notification["id"] subscription_id = topic_to_subscription.get(notification["topic"]) if subscription_id: new_notification = Notification( title=notification.get("title", "No Title"), message=notification.get("message", ""), priority=notification.get("priority", 3), created_at=datetime.fromtimestamp(notification["time"]), subscription_id=subscription_id, ) db.add(new_notification) for topic, message_id in last_message_ids.items(): subscription_id = topic_to_subscription.get(topic) if subscription_id: subscription = ( db.query(Subscription) .filter(Subscription.id == subscription_id) .first() ) if subscription: subscription.last_message_id = message_id db.commit() db.close() def main(): """Main function to fetch and save notifications.""" db = SessionLocal() # Get the ntfy base URL from settings settings = db.query(Settings).filter(Settings.user == "default").first() if not settings: print("Default user settings not found.") return ntfy_url = settings.ntfy_url if not ntfy_url: print("Ntfy URL not found in settings.") return # Get all subscribed topics subscriptions = db.query(Subscription).all() topic_to_subscription = { subscription.topic: subscription.id for subscription in subscriptions } db.close() # Fetch notifications from ntfy.sh notifications = fetch_ntfy_notifications(ntfy_url, subscriptions) # Save notifications to the database save_notifications_to_db(notifications, topic_to_subscription, db) if __name__ == "__main__": main()