import os import requests from datetime import datetime from model import SessionLocal, Subscription, Settings, Notification import json # Constants NTFY_TOKEN = os.getenv("NTFY_TOKEN") def fetch_ntfy_notifications(base_url, subscriptions): """Fetch notifications from the ntfy.sh server for the given subscriptions using streaming.""" headers = { "Accept": "application/json", "Authorization": f"Bearer {NTFY_TOKEN}" if NTFY_TOKEN else None, } notifications = [] for subscription in subscriptions: print(f"Fetching notifications for {subscription.topic}") topic = subscription.topic last_message_id = subscription.last_message_id since_param = "all" if last_message_id is None else last_message_id url = f"{base_url}/{topic}/json?poll=1&since={since_param}" response = requests.get(url, headers=headers, stream=True) response.raise_for_status() for line in response.iter_lines(): if line: notification = json.loads(line) if notification.get("event") == "message": notifications.append(notification) print(f"Fetched {len(notifications)} notifications") return notifications def save_notifications_to_db(notifications, topic_to_subscription, db): """Save the fetched notifications to the database and update last_message_id.""" last_message_ids = {} for notification in notifications: topic = notification["topic"] last_message_ids[topic] = notification["id"] subscription_id = topic_to_subscription.get(notification["topic"]) if subscription_id: new_notification = Notification( title=notification.get("title", "No Title"), message=notification.get("message", ""), priority=notification.get("priority", 3), created_at=datetime.fromtimestamp(notification["time"]), subscription_id=subscription_id, ) db.add(new_notification) for topic, message_id in last_message_ids.items(): subscription_id = topic_to_subscription.get(topic) if subscription_id: subscription = ( db.query(Subscription) .filter(Subscription.id == subscription_id) .first() ) if subscription: subscription.last_message_id = message_id db.commit() def process_user_notifications(user_settings, db): """Process notifications for a specific user's subscriptions.""" ntfy_url = user_settings.ntfy_url if not ntfy_url: print(f"Ntfy URL not found for user ID {user_settings.user_id}. Skipping...") return # Get all subscriptions for the user subscriptions = ( db.query(Subscription) .filter(Subscription.user_id == user_settings.user_id) .all() ) topic_to_subscription = { subscription.topic: subscription.id for subscription in subscriptions } # Fetch notifications from ntfy.sh notifications = fetch_ntfy_notifications(ntfy_url, subscriptions) # Save notifications to the database save_notifications_to_db(notifications, topic_to_subscription, db) def main(): """Main function to fetch and save notifications for all users.""" db = SessionLocal() # Get all user settings user_settings_list = db.query(Settings).all() if not user_settings_list: print("No user settings found.") return # Process notifications for each user for user_settings in user_settings_list: print(f"Processing notifications for user ID {user_settings.user_id}") process_user_notifications(user_settings, db) db.close() if __name__ == "__main__": main()